From 2b0c15d352d9bc6c13bfbf6a54a6ac5ff12a6085 Mon Sep 17 00:00:00 2001 From: ShivramSriramulu Date: Tue, 9 Sep 2025 10:54:41 -0700 Subject: [PATCH] MM2: Add fault-tolerance enhancements to MirrorSourceTask - Add fail-fast truncation detection with detailed error logging - Add graceful topic reset handling with auto-recovery - Add configuration toggles for fault tolerance features - Add AdminClient-based topic ID tracking for reset detection - Add seekToBeginning for topic reset recovery - Maintain backward compatibility with existing MM2 behavior Features: - mirrorsource.fail.on.truncation=true (default) - mirrorsource.auto.recover.on.reset=true (default) - mirrorsource.topic.reset.retry.ms=5000 (default) This addresses silent data loss scenarios and improves resilience during planned maintenance operations involving topic resets. --- .../connect/mirror/MirrorSourceTask.java | 75 +++++++++++ mm2-fault-tolerance.patch | 120 ++++++++++++++++++ 2 files changed, 195 insertions(+) create mode 100644 mm2-fault-tolerance.patch diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java index 6ab65bebdca2e..b2f455e380131 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java @@ -16,6 +16,16 @@ */ package org.apache.kafka.connect.mirror; +import java.time.Duration; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import org.apache.kafka.clients.admin.*; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.OffsetOutOfRangeException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.connect.errors.ConnectException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -46,6 +56,7 @@ public class MirrorSourceTask extends SourceTask { private static final Logger log = LoggerFactory.getLogger(MirrorSourceTask.class); + private static final Logger FT_LOG = LoggerFactory.getLogger("mm2.fault.tolerance"); private KafkaConsumer consumer; private String sourceClusterAlias; @@ -55,6 +66,13 @@ public class MirrorSourceTask extends SourceTask { private boolean stopping = false; private Semaphore consumerAccess; private OffsetSyncWriter offsetSyncWriter; + + // Fault tolerance enhancements + private Admin sourceAdmin; + private final ConcurrentHashMap topicIds = new ConcurrentHashMap<>(); + private boolean failOnTruncation = true; + private boolean autoRecoverOnReset = true; + private long topicResetRetryMs = 5000L; public MirrorSourceTask() {} @@ -85,6 +103,15 @@ public void start(Map props) { Set taskTopicPartitions = config.taskTopicPartitions(); initializeConsumer(taskTopicPartitions); + // Fault tolerance configuration + this.failOnTruncation = Boolean.parseBoolean(props.getOrDefault("mirrorsource.fail.on.truncation", "true")); + this.autoRecoverOnReset = Boolean.parseBoolean(props.getOrDefault("mirrorsource.auto.recover.on.reset", "true")); + this.topicResetRetryMs = Long.parseLong(props.getOrDefault("mirrorsource.topic.reset.retry.ms", "5000")); + + // Build AdminClient for source cluster (same configs as source consumer) + Map adminProps = new HashMap<>(config.sourceConsumerConfig("replication-consumer")); + sourceAdmin = Admin.create(adminProps); + log.info("{} replicating {} topic-partitions {}->{}: {}.", Thread.currentThread().getName(), taskTopicPartitions.size(), sourceClusterAlias, config.targetClusterAlias(), taskTopicPartitions); } @@ -133,6 +160,7 @@ public List poll() { } try { ConsumerRecords records = consumer.poll(pollTimeout); + handleTopicResetIfAny(); List sourceRecords = new ArrayList<>(records.count()); for (ConsumerRecord record : records) { SourceRecord converted = convertRecord(record); @@ -148,6 +176,20 @@ public List poll() { log.trace("Polled {} records from {}.", sourceRecords.size(), records.partitions()); return sourceRecords; } + } catch (OffsetOutOfRangeException oore) { + if (!failOnTruncation) throw oore; + // Fail-fast with precise diagnostics + Map earliest = consumer.beginningOffsets(consumer.assignment()); + String msg = "TRUNCATION_DETECTED: Source offsets are out of range (likely retention purge). " + + "Earliest per partition=" + earliest + ", assignment=" + consumer.assignment(); + FT_LOG.error(msg, oore); + throw new ConnectException(msg, oore); + } catch (UnknownTopicOrPartitionException utpe) { + if (!autoRecoverOnReset) throw utpe; + FT_LOG.warn("TOPIC_RESET_SUSPECTED: {}. Will retry metadata and resubscribe in {} ms.", utpe.getMessage(), topicResetRetryMs); + sleep(topicResetRetryMs); + handleTopicResetIfAny(); + return Collections.emptyList(); } catch (WakeupException e) { return null; } catch (KafkaException e) { @@ -255,4 +297,37 @@ private static int byteSize(byte[] bytes) { private boolean isUncommitted(Long offset) { return offset == null || offset < 0; } + + private void handleTopicResetIfAny() { + if (!autoRecoverOnReset) return; + // Describe all topics currently subscribed + Set topics = consumer.subscription(); + if (topics.isEmpty()) return; + try { + DescribeTopicsResult res = sourceAdmin.describeTopics(topics); + Map desc = res.all().get(); + for (Map.Entry e : desc.entrySet()) { + String t = e.getKey(); + Uuid current = e.getValue().topicId(); + Uuid previous = topicIds.putIfAbsent(t, current); + if (previous != null && !previous.equals(current)) { + FT_LOG.warn("RESET_DETECTED: Topic {} recreated (oldId={} newId={}). Seeking to beginning.", t, previous, current); + // Seek to beginning for all assigned partitions of this topic + Set toSeek = consumer.assignment().stream() + .filter(tp -> tp.topic().equals(t)) + .collect(Collectors.toSet()); + if (!toSeek.isEmpty()) consumer.seekToBeginning(toSeek); + } + } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } catch (ExecutionException ee) { + // If topic truly doesn't exist yet, we'll retry on next poll + FT_LOG.debug("Topic describe failed; will retry: {}", ee.getMessage()); + } + } + + private static void sleep(long ms) { + try { Thread.sleep(ms); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } + } } diff --git a/mm2-fault-tolerance.patch b/mm2-fault-tolerance.patch new file mode 100644 index 0000000000000..920e449f990c4 --- /dev/null +++ b/mm2-fault-tolerance.patch @@ -0,0 +1,120 @@ +--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java ++++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java +@@ -1,5 +1,15 @@ + package org.apache.kafka.connect.mirror; + ++import java.time.Duration; ++import java.util.concurrent.ConcurrentHashMap; ++import java.util.concurrent.ExecutionException; ++import org.apache.kafka.clients.admin.*; ++import org.apache.kafka.common.Uuid; ++import org.apache.kafka.common.errors.OffsetOutOfRangeException; ++import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; ++import org.apache.kafka.connect.errors.ConnectException; ++import org.slf4j.Logger; ++import org.slf4j.LoggerFactory; ++ + import org.apache.kafka.clients.consumer.ConsumerRecord; + import org.apache.kafka.clients.consumer.ConsumerRecords; + import org.apache.kafka.clients.consumer.KafkaConsumer; +@@ -20,6 +30,12 @@ public class MirrorSourceTask extends SourceTask { + private static final Logger log = LoggerFactory.getLogger(MirrorSourceTask.class); ++ private static final Logger FT_LOG = LoggerFactory.getLogger("mm2.fault.tolerance"); + + private KafkaConsumer consumer; + private String sourceClusterAlias; +@@ -30,6 +36,12 @@ public class MirrorSourceTask extends SourceTask { + private boolean stopping = false; + private Semaphore consumerAccess; + private OffsetSyncWriter offsetSyncWriter; ++ ++ // Fault tolerance enhancements ++ private Admin sourceAdmin; ++ private final ConcurrentHashMap topicIds = new ConcurrentHashMap<>(); ++ private boolean failOnTruncation = true; ++ private boolean autoRecoverOnReset = true; ++ private long topicResetRetryMs = 5000L; + + public MirrorSourceTask() {} +@@ -89,6 +101,15 @@ public class MirrorSourceTask extends SourceTask { + consumer = MirrorUtils.newConsumer(config.sourceConsumerConfig("replication-consumer")); + Set taskTopicPartitions = config.taskTopicPartitions(); + initializeConsumer(taskTopicPartitions); ++ ++ // Fault tolerance configuration ++ this.failOnTruncation = Boolean.parseBoolean(props.getOrDefault("mirrorsource.fail.on.truncation", "true")); ++ this.autoRecoverOnReset = Boolean.parseBoolean(props.getOrDefault("mirrorsource.auto.recover.on.reset", "true")); ++ this.topicResetRetryMs = Long.parseLong(props.getOrDefault("mirrorsource.topic.reset.retry.ms", "5000")); ++ ++ // Build AdminClient for source cluster (same configs as source consumer) ++ Map adminProps = new HashMap<>(config.sourceConsumerConfig("replication-consumer")); ++ sourceAdmin = Admin.create(adminProps); + + log.info("{} replicating {} topic-partitions {}->{}: {}.", Thread.currentThread().getName(), + taskTopicPartitions.size(), sourceClusterAlias, config.targetClusterAlias(), taskTopicPartitions); +@@ -150,6 +171,7 @@ public class MirrorSourceTask extends SourceTask { + try { + ConsumerRecords records = consumer.poll(pollTimeout); ++ handleTopicResetIfAny(); + List sourceRecords = new ArrayList<>(records.count()); + for (ConsumerRecord record : records) { + SourceRecord converted = convertRecord(record); +@@ -163,6 +185,20 @@ public class MirrorSourceTask extends SourceTask { + log.trace("Polled {} records from {}.", sourceRecords.size(), records.partitions()); + return sourceRecords; + } ++ } catch (OffsetOutOfRangeException oore) { ++ if (!failOnTruncation) throw oore; ++ // Fail-fast with precise diagnostics ++ Map earliest = consumer.beginningOffsets(consumer.assignment()); ++ String msg = "TRUNCATION_DETECTED: Source offsets are out of range (likely retention purge). " + ++ "Earliest per partition=" + earliest + ", assignment=" + consumer.assignment(); ++ FT_LOG.error(msg, oore); ++ throw new ConnectException(msg, oore); ++ } catch (UnknownTopicOrPartitionException utpe) { ++ if (!autoRecoverOnReset) throw utpe; ++ FT_LOG.warn("TOPIC_RESET_SUSPECTED: {}. Will retry metadata and resubscribe in {} ms.", utpe.getMessage(), topicResetRetryMs); ++ sleep(topicResetRetryMs); ++ handleTopicResetIfAny(); ++ return Collections.emptyList(); + } catch (WakeupException e) { + return null; + } catch (KafkaException e) { +@@ -294,6 +330,35 @@ public class MirrorSourceTask extends SourceTask { + private boolean isUncommitted(Long offset) { + return offset == null || offset < 0; + } ++ ++ private void handleTopicResetIfAny() { ++ if (!autoRecoverOnReset) return; ++ // Describe all topics currently subscribed ++ Set topics = consumer.subscription(); ++ if (topics.isEmpty()) return; ++ try { ++ DescribeTopicsResult res = sourceAdmin.describeTopics(topics); ++ Map desc = res.all().get(); ++ for (Map.Entry e : desc.entrySet()) { ++ String t = e.getKey(); ++ Uuid current = e.getValue().topicId(); ++ Uuid previous = topicIds.putIfAbsent(t, current); ++ if (previous != null && !previous.equals(current)) { ++ FT_LOG.warn("RESET_DETECTED: Topic {} recreated (oldId={} newId={}). Seeking to beginning.", t, previous, current); ++ // Seek to beginning for all assigned partitions of this topic ++ Set toSeek = consumer.assignment().stream() ++ .filter(tp -> tp.topic().equals(t)) ++ .collect(Collectors.toSet()); ++ if (!toSeek.isEmpty()) consumer.seekToBeginning(toSeek); ++ } ++ } ++ } catch (InterruptedException ie) { ++ Thread.currentThread().interrupt(); ++ } catch (ExecutionException ee) { ++ // If topic truly doesn't exist yet, we'll retry on next poll ++ FT_LOG.debug("Topic describe failed; will retry: {}", ee.getMessage()); ++ } ++ } ++ ++ private static void sleep(long ms) { ++ try { Thread.sleep(ms); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } ++ } + } \ No newline at end of file