From 63ade621dc0045e7eeba2c76ba1dfbf146d4d8d6 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Thu, 18 Sep 2025 17:10:43 -0700 Subject: [PATCH 1/4] MINOR: fix incorrect offset reset logging --- .../kafka/streams/processor/internals/StreamThread.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index d893220621721..58a7adeed35d5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -1647,14 +1647,14 @@ private void resetOffsets(final Set partitions, final Exception addToResetList( partition, seekToBeginning, - "Setting topic '{}' to consume from earliest offset", + "earliest", loggedTopics ); } else if (resetPolicy == AutoOffsetResetStrategy.LATEST) { addToResetList( partition, seekToEnd, - "Setting topic '{}' to consume from latest offset", + "latest", loggedTopics ); } else if (resetPolicy.type() == AutoOffsetResetStrategy.StrategyType.BY_DURATION) { @@ -1678,14 +1678,14 @@ private void resetOffsets(final Set partitions, final Exception addToResetList( partition, seekToBeginning, - "No custom setting defined for topic '{}' using original config 'earliest' for offset reset", + "earliest", loggedTopics ); } else if (resetPolicy == AutoOffsetResetStrategy.LATEST) { addToResetList( partition, seekToEnd, - "No custom setting defined for topic '{}' using original config 'latest' for offset reset", + "latest", loggedTopics ); } else if (resetPolicy.type() == AutoOffsetResetStrategy.StrategyType.BY_DURATION) { From c244ad35988434a5b8c1911ca4c67f77b07f2d07 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Thu, 18 Sep 2025 19:10:35 -0700 Subject: [PATCH 2/4] update --- .../streams/processor/internals/StreamThread.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 58a7adeed35d5..8d7b24cb285f6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -1647,14 +1647,14 @@ private void resetOffsets(final Set partitions, final Exception addToResetList( partition, seekToBeginning, - "earliest", + "Setting topic '{}' to consume from earliest offset", loggedTopics ); } else if (resetPolicy == AutoOffsetResetStrategy.LATEST) { addToResetList( partition, seekToEnd, - "latest", + "Setting topic '{}' to consume from latest offset", loggedTopics ); } else if (resetPolicy.type() == AutoOffsetResetStrategy.StrategyType.BY_DURATION) { @@ -1678,14 +1678,14 @@ private void resetOffsets(final Set partitions, final Exception addToResetList( partition, seekToBeginning, - "earliest", + "No custom setting defined for topic '{}' using original config earliest for offset reset", loggedTopics ); } else if (resetPolicy == AutoOffsetResetStrategy.LATEST) { addToResetList( partition, seekToEnd, - "latest", + "No custom setting defined for topic '{}' using original config latest for offset reset", loggedTopics ); } else if (resetPolicy.type() == AutoOffsetResetStrategy.StrategyType.BY_DURATION) { @@ -1778,12 +1778,12 @@ private void resetOffsets(final Set partitions, final Exception private void addToResetList( final TopicPartition partition, final Set partitions, - final String resetPolicy, + final String logMessage, final Set loggedTopics ) { final String topic = partition.topic(); if (loggedTopics.add(topic)) { - log.info("Setting topic '{}' to consume from {} offset", topic, resetPolicy); + log.info(logMessage, topic); } partitions.add(partition); } From 5958db08b13ce2a3f1e02e0d4c5ffacd963aa83e Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Thu, 18 Sep 2025 19:14:23 -0700 Subject: [PATCH 3/4] cleanup --- .../streams/processor/internals/StreamThread.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 8d7b24cb285f6..03ebb104621e7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -1647,14 +1647,14 @@ private void resetOffsets(final Set partitions, final Exception addToResetList( partition, seekToBeginning, - "Setting topic '{}' to consume from earliest offset", + "Setting topic '{}' to consume from 'earliest' offset", loggedTopics ); } else if (resetPolicy == AutoOffsetResetStrategy.LATEST) { addToResetList( partition, seekToEnd, - "Setting topic '{}' to consume from latest offset", + "Setting topic '{}' to consume from 'latest' offset", loggedTopics ); } else if (resetPolicy.type() == AutoOffsetResetStrategy.StrategyType.BY_DURATION) { @@ -1662,7 +1662,7 @@ private void resetOffsets(final Set partitions, final Exception partition, seekByDuration, resetPolicy.duration().get(), - "Setting topic '{}' to consume from by_duration:{}", + "Setting topic '{}' to consume from 'by_duration:{}'", resetPolicy.duration().get().toString(), loggedTopics ); @@ -1678,14 +1678,14 @@ private void resetOffsets(final Set partitions, final Exception addToResetList( partition, seekToBeginning, - "No custom setting defined for topic '{}' using original config earliest for offset reset", + "No custom setting defined for topic '{}' using original config 'earliest' for offset reset", loggedTopics ); } else if (resetPolicy == AutoOffsetResetStrategy.LATEST) { addToResetList( partition, seekToEnd, - "No custom setting defined for topic '{}' using original config latest for offset reset", + "No custom setting defined for topic '{}' using original config 'latest' for offset reset", loggedTopics ); } else if (resetPolicy.type() == AutoOffsetResetStrategy.StrategyType.BY_DURATION) { From b1e862f8e7bf8e9493c0b4ed8d1ab7d2e7e611ce Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 19 Sep 2025 17:55:09 -0700 Subject: [PATCH 4/4] review comment --- .../apache/kafka/streams/processor/internals/StreamThread.java | 1 + 1 file changed, 1 insertion(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 03ebb104621e7..649a1ec666cd0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -1532,6 +1532,7 @@ private ConsumerRecords pollRequests(final Duration pollTime) { try { records = mainConsumer.poll(pollTime); } catch (final InvalidOffsetException e) { + log.info("Found no valid offset for {} partitions, resetting.", e.partitions().size()); resetOffsets(e.partitions(), e); }