Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@
*/
package org.apache.kafka.connect.mirror;

import java.time.Duration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand Down Expand Up @@ -46,6 +56,7 @@
public class MirrorSourceTask extends SourceTask {

private static final Logger log = LoggerFactory.getLogger(MirrorSourceTask.class);
private static final Logger FT_LOG = LoggerFactory.getLogger("mm2.fault.tolerance");

private KafkaConsumer<byte[], byte[]> consumer;
private String sourceClusterAlias;
Expand All @@ -55,6 +66,13 @@ public class MirrorSourceTask extends SourceTask {
private boolean stopping = false;
private Semaphore consumerAccess;
private OffsetSyncWriter offsetSyncWriter;

// Fault tolerance enhancements
private Admin sourceAdmin;
private final ConcurrentHashMap<String, Uuid> topicIds = new ConcurrentHashMap<>();
private boolean failOnTruncation = true;
private boolean autoRecoverOnReset = true;
private long topicResetRetryMs = 5000L;

public MirrorSourceTask() {}

Expand Down Expand Up @@ -85,6 +103,15 @@ public void start(Map<String, String> props) {
Set<TopicPartition> taskTopicPartitions = config.taskTopicPartitions();
initializeConsumer(taskTopicPartitions);

// Fault tolerance configuration
this.failOnTruncation = Boolean.parseBoolean(props.getOrDefault("mirrorsource.fail.on.truncation", "true"));
this.autoRecoverOnReset = Boolean.parseBoolean(props.getOrDefault("mirrorsource.auto.recover.on.reset", "true"));
this.topicResetRetryMs = Long.parseLong(props.getOrDefault("mirrorsource.topic.reset.retry.ms", "5000"));

// Build AdminClient for source cluster (same configs as source consumer)
Map<String, Object> adminProps = new HashMap<>(config.sourceConsumerConfig("replication-consumer"));
sourceAdmin = Admin.create(adminProps);

log.info("{} replicating {} topic-partitions {}->{}: {}.", Thread.currentThread().getName(),
taskTopicPartitions.size(), sourceClusterAlias, config.targetClusterAlias(), taskTopicPartitions);
}
Expand Down Expand Up @@ -133,6 +160,7 @@ public List<SourceRecord> poll() {
}
try {
ConsumerRecords<byte[], byte[]> records = consumer.poll(pollTimeout);
handleTopicResetIfAny();
List<SourceRecord> sourceRecords = new ArrayList<>(records.count());
for (ConsumerRecord<byte[], byte[]> record : records) {
SourceRecord converted = convertRecord(record);
Expand All @@ -148,6 +176,20 @@ public List<SourceRecord> poll() {
log.trace("Polled {} records from {}.", sourceRecords.size(), records.partitions());
return sourceRecords;
}
} catch (OffsetOutOfRangeException oore) {
if (!failOnTruncation) throw oore;
// Fail-fast with precise diagnostics
Map<TopicPartition, Long> earliest = consumer.beginningOffsets(consumer.assignment());
String msg = "TRUNCATION_DETECTED: Source offsets are out of range (likely retention purge). " +
"Earliest per partition=" + earliest + ", assignment=" + consumer.assignment();
FT_LOG.error(msg, oore);
throw new ConnectException(msg, oore);
} catch (UnknownTopicOrPartitionException utpe) {
if (!autoRecoverOnReset) throw utpe;
FT_LOG.warn("TOPIC_RESET_SUSPECTED: {}. Will retry metadata and resubscribe in {} ms.", utpe.getMessage(), topicResetRetryMs);
sleep(topicResetRetryMs);
handleTopicResetIfAny();
return Collections.emptyList();
} catch (WakeupException e) {
return null;
} catch (KafkaException e) {
Expand Down Expand Up @@ -255,4 +297,37 @@ private static int byteSize(byte[] bytes) {
private boolean isUncommitted(Long offset) {
return offset == null || offset < 0;
}

private void handleTopicResetIfAny() {
if (!autoRecoverOnReset) return;
// Describe all topics currently subscribed
Set<String> topics = consumer.subscription();
if (topics.isEmpty()) return;
try {
DescribeTopicsResult res = sourceAdmin.describeTopics(topics);
Map<String, TopicDescription> desc = res.all().get();
for (Map.Entry<String, TopicDescription> e : desc.entrySet()) {
String t = e.getKey();
Uuid current = e.getValue().topicId();
Uuid previous = topicIds.putIfAbsent(t, current);
if (previous != null && !previous.equals(current)) {
FT_LOG.warn("RESET_DETECTED: Topic {} recreated (oldId={} newId={}). Seeking to beginning.", t, previous, current);
// Seek to beginning for all assigned partitions of this topic
Set<TopicPartition> toSeek = consumer.assignment().stream()
.filter(tp -> tp.topic().equals(t))
.collect(Collectors.toSet());
if (!toSeek.isEmpty()) consumer.seekToBeginning(toSeek);
}
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
} catch (ExecutionException ee) {
// If topic truly doesn't exist yet, we'll retry on next poll
FT_LOG.debug("Topic describe failed; will retry: {}", ee.getMessage());
}
}

private static void sleep(long ms) {
try { Thread.sleep(ms); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); }
}
}
120 changes: 120 additions & 0 deletions mm2-fault-tolerance.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
@@ -1,5 +1,15 @@
package org.apache.kafka.connect.mirror;

+import java.time.Duration;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import org.apache.kafka.clients.admin.*;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -20,6 +30,12 @@ public class MirrorSourceTask extends SourceTask {
private static final Logger log = LoggerFactory.getLogger(MirrorSourceTask.class);
+ private static final Logger FT_LOG = LoggerFactory.getLogger("mm2.fault.tolerance");

private KafkaConsumer<byte[], byte[]> consumer;
private String sourceClusterAlias;
@@ -30,6 +36,12 @@ public class MirrorSourceTask extends SourceTask {
private boolean stopping = false;
private Semaphore consumerAccess;
private OffsetSyncWriter offsetSyncWriter;
+
+ // Fault tolerance enhancements
+ private Admin sourceAdmin;
+ private final ConcurrentHashMap<String, Uuid> topicIds = new ConcurrentHashMap<>();
+ private boolean failOnTruncation = true;
+ private boolean autoRecoverOnReset = true;
+ private long topicResetRetryMs = 5000L;

public MirrorSourceTask() {}
@@ -89,6 +101,15 @@ public class MirrorSourceTask extends SourceTask {
consumer = MirrorUtils.newConsumer(config.sourceConsumerConfig("replication-consumer"));
Set<TopicPartition> taskTopicPartitions = config.taskTopicPartitions();
initializeConsumer(taskTopicPartitions);
+
+ // Fault tolerance configuration
+ this.failOnTruncation = Boolean.parseBoolean(props.getOrDefault("mirrorsource.fail.on.truncation", "true"));
+ this.autoRecoverOnReset = Boolean.parseBoolean(props.getOrDefault("mirrorsource.auto.recover.on.reset", "true"));
+ this.topicResetRetryMs = Long.parseLong(props.getOrDefault("mirrorsource.topic.reset.retry.ms", "5000"));
+
+ // Build AdminClient for source cluster (same configs as source consumer)
+ Map<String, Object> adminProps = new HashMap<>(config.sourceConsumerConfig("replication-consumer"));
+ sourceAdmin = Admin.create(adminProps);

log.info("{} replicating {} topic-partitions {}->{}: {}.", Thread.currentThread().getName(),
taskTopicPartitions.size(), sourceClusterAlias, config.targetClusterAlias(), taskTopicPartitions);
@@ -150,6 +171,7 @@ public class MirrorSourceTask extends SourceTask {
try {
ConsumerRecords<byte[], byte[]> records = consumer.poll(pollTimeout);
+ handleTopicResetIfAny();
List<SourceRecord> sourceRecords = new ArrayList<>(records.count());
for (ConsumerRecord<byte[], byte[]> record : records) {
SourceRecord converted = convertRecord(record);
@@ -163,6 +185,20 @@ public class MirrorSourceTask extends SourceTask {
log.trace("Polled {} records from {}.", sourceRecords.size(), records.partitions());
return sourceRecords;
}
+ } catch (OffsetOutOfRangeException oore) {
+ if (!failOnTruncation) throw oore;
+ // Fail-fast with precise diagnostics
+ Map<TopicPartition, Long> earliest = consumer.beginningOffsets(consumer.assignment());
+ String msg = "TRUNCATION_DETECTED: Source offsets are out of range (likely retention purge). " +
+ "Earliest per partition=" + earliest + ", assignment=" + consumer.assignment();
+ FT_LOG.error(msg, oore);
+ throw new ConnectException(msg, oore);
+ } catch (UnknownTopicOrPartitionException utpe) {
+ if (!autoRecoverOnReset) throw utpe;
+ FT_LOG.warn("TOPIC_RESET_SUSPECTED: {}. Will retry metadata and resubscribe in {} ms.", utpe.getMessage(), topicResetRetryMs);
+ sleep(topicResetRetryMs);
+ handleTopicResetIfAny();
+ return Collections.emptyList();
} catch (WakeupException e) {
return null;
} catch (KafkaException e) {
@@ -294,6 +330,35 @@ public class MirrorSourceTask extends SourceTask {
private boolean isUncommitted(Long offset) {
return offset == null || offset < 0;
}
+
+ private void handleTopicResetIfAny() {
+ if (!autoRecoverOnReset) return;
+ // Describe all topics currently subscribed
+ Set<String> topics = consumer.subscription();
+ if (topics.isEmpty()) return;
+ try {
+ DescribeTopicsResult res = sourceAdmin.describeTopics(topics);
+ Map<String, TopicDescription> desc = res.all().get();
+ for (Map.Entry<String, TopicDescription> e : desc.entrySet()) {
+ String t = e.getKey();
+ Uuid current = e.getValue().topicId();
+ Uuid previous = topicIds.putIfAbsent(t, current);
+ if (previous != null && !previous.equals(current)) {
+ FT_LOG.warn("RESET_DETECTED: Topic {} recreated (oldId={} newId={}). Seeking to beginning.", t, previous, current);
+ // Seek to beginning for all assigned partitions of this topic
+ Set<TopicPartition> toSeek = consumer.assignment().stream()
+ .filter(tp -> tp.topic().equals(t))
+ .collect(Collectors.toSet());
+ if (!toSeek.isEmpty()) consumer.seekToBeginning(toSeek);
+ }
+ }
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException ee) {
+ // If topic truly doesn't exist yet, we'll retry on next poll
+ FT_LOG.debug("Topic describe failed; will retry: {}", ee.getMessage());
+ }
+ }
+
+ private static void sleep(long ms) {
+ try { Thread.sleep(ms); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); }
+ }
}