From f313748ecbcbc9823c9219563c1eed854f69ed26 Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Wed, 21 Jan 2026 13:56:33 -0500 Subject: [PATCH 1/7] Bump version to 4.1.5-SNAPSHOT --- pom.xml | 2 +- psc-common/pom.xml | 2 +- psc-examples/pom.xml | 4 ++-- psc-flink-logging/pom.xml | 4 ++-- psc-flink/pom.xml | 2 +- psc-integration-test/pom.xml | 2 +- psc-logging/pom.xml | 2 +- psc/pom.xml | 2 +- 8 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pom.xml b/pom.xml index 4ce36e7..52ba8d1 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ psc-java-oss PubSub Client (PSC) https://github.com/pinterest/psc - 4.1.4 + 4.1.5-SNAPSHOT pom psc-java-oss diff --git a/psc-common/pom.xml b/psc-common/pom.xml index 9f637ff..280c9fa 100644 --- a/psc-common/pom.xml +++ b/psc-common/pom.xml @@ -5,7 +5,7 @@ psc-java-oss com.pinterest.psc - 4.1.4 + 4.1.5-SNAPSHOT ../pom.xml 4.0.0 diff --git a/psc-examples/pom.xml b/psc-examples/pom.xml index ff201f8..815f2a2 100644 --- a/psc-examples/pom.xml +++ b/psc-examples/pom.xml @@ -5,13 +5,13 @@ psc-java-oss com.pinterest.psc - 4.1.4 + 4.1.5-SNAPSHOT ../pom.xml 4.0.0 psc-examples - 4.1.4 + 4.1.5-SNAPSHOT psc-examples diff --git a/psc-flink-logging/pom.xml b/psc-flink-logging/pom.xml index a430014..372df67 100644 --- a/psc-flink-logging/pom.xml +++ b/psc-flink-logging/pom.xml @@ -5,13 +5,13 @@ psc-java-oss com.pinterest.psc - 4.1.4 + 4.1.5-SNAPSHOT ../pom.xml 4.0.0 psc-flink-logging - 4.1.4 + 4.1.5-SNAPSHOT diff --git a/psc-flink/pom.xml b/psc-flink/pom.xml index d76eaaa..f79e060 100644 --- a/psc-flink/pom.xml +++ b/psc-flink/pom.xml @@ -5,7 +5,7 @@ com.pinterest.psc psc-java-oss - 4.1.4 + 4.1.5-SNAPSHOT ../pom.xml psc-flink diff --git a/psc-integration-test/pom.xml b/psc-integration-test/pom.xml index 3b12520..b3b727f 100644 --- a/psc-integration-test/pom.xml +++ b/psc-integration-test/pom.xml @@ -5,7 +5,7 @@ psc-java-oss com.pinterest.psc - 4.1.4 + 4.1.5-SNAPSHOT ../pom.xml 4.0.0 diff --git a/psc-logging/pom.xml b/psc-logging/pom.xml index 398b0f2..df172b0 100644 --- a/psc-logging/pom.xml +++ b/psc-logging/pom.xml @@ -5,7 +5,7 @@ psc-java-oss com.pinterest.psc - 4.1.4 + 4.1.5-SNAPSHOT ../pom.xml 4.0.0 diff --git a/psc/pom.xml b/psc/pom.xml index fb7054d..647e26e 100644 --- a/psc/pom.xml +++ b/psc/pom.xml @@ -5,7 +5,7 @@ psc-java-oss com.pinterest.psc - 4.1.4 + 4.1.5-SNAPSHOT ../pom.xml 4.0.0 From 5b56075e2f822a4d469024281704da0afd161ac6 Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Wed, 21 Jan 2026 19:37:40 -0500 Subject: [PATCH 2/7] Throw RuntimeException in PscTopicUriPartitionSplitReader --- .../psc/source/reader/PscTopicUriPartitionSplitReader.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java index e6e6f77..3888052 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java @@ -108,7 +108,7 @@ public RecordsWithSplitIds> fetch() throws IO PscConsumerMessagesIterable consumerMessagesIterable; try { consumerMessagesIterable = new PscConsumerMessagesIterable<>(consumer.poll(Duration.ofMillis(POLL_TIMEOUT))); - } catch (ConsumerException e) { + } catch (WakeupException | IllegalStateException e) { // IllegalStateException will be thrown if the consumer is not assigned any partitions. // This happens if all assigned partitions are invalid or empty (starting offset >= // stopping offset). We just mark empty partitions as finished and return an empty @@ -118,6 +118,9 @@ public RecordsWithSplitIds> fetch() throws IO PscConsumerMessagesIterable.emptyIterable(), pscSourceReaderMetrics); markEmptySplitsAsFinished(recordsBySplits); return recordsBySplits; + } catch (ConsumerException ce) { + // all other exceptions + throw new RuntimeException(ce); } PscPartitionSplitRecords recordsBySplits = new PscPartitionSplitRecords(consumerMessagesIterable, pscSourceReaderMetrics); From fd93aa6bca1bb6412ef75af108d8f5fcb6b592d7 Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Mon, 26 Jan 2026 11:56:13 -0500 Subject: [PATCH 3/7] Catch IllegalStateException --- .../PscTopicUriPartitionSplitReader.java | 19 ++++++++++--------- .../pinterest/psc/consumer/PscConsumer.java | 4 ++-- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java index 3888052..025e21e 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java @@ -108,19 +108,20 @@ public RecordsWithSplitIds> fetch() throws IO PscConsumerMessagesIterable consumerMessagesIterable; try { consumerMessagesIterable = new PscConsumerMessagesIterable<>(consumer.poll(Duration.ofMillis(POLL_TIMEOUT))); - } catch (WakeupException | IllegalStateException e) { + } catch (ConsumerException e) { // IllegalStateException will be thrown if the consumer is not assigned any partitions. // This happens if all assigned partitions are invalid or empty (starting offset >= // stopping offset). We just mark empty partitions as finished and return an empty // record container, and this consumer will be closed by SplitFetcherManager. - PscPartitionSplitRecords recordsBySplits = - new PscPartitionSplitRecords( - PscConsumerMessagesIterable.emptyIterable(), pscSourceReaderMetrics); - markEmptySplitsAsFinished(recordsBySplits); - return recordsBySplits; - } catch (ConsumerException ce) { - // all other exceptions - throw new RuntimeException(ce); + if (e.getCause() != null && + (e.getCause().getClass().isAssignableFrom(IllegalStateException.class) || e.getCause().getClass().isAssignableFrom(WakeupException.class))) { + PscPartitionSplitRecords recordsBySplits = + new PscPartitionSplitRecords( + PscConsumerMessagesIterable.emptyIterable(), pscSourceReaderMetrics); + markEmptySplitsAsFinished(recordsBySplits); + return recordsBySplits; + } + throw new RuntimeException(e); } PscPartitionSplitRecords recordsBySplits = new PscPartitionSplitRecords(consumerMessagesIterable, pscSourceReaderMetrics); diff --git a/psc/src/main/java/com/pinterest/psc/consumer/PscConsumer.java b/psc/src/main/java/com/pinterest/psc/consumer/PscConsumer.java index 1dc158a..b27bd32 100644 --- a/psc/src/main/java/com/pinterest/psc/consumer/PscConsumer.java +++ b/psc/src/main/java/com/pinterest/psc/consumer/PscConsumer.java @@ -702,9 +702,9 @@ public PscConsumerPollMessageIterator poll(Duration pollTimeout) throws Co acquireAndEnsureOpen(); try { if (messageListener != null) { - throw new ConsumerException(ExceptionMessage.MUTUALLY_EXCLUSIVE_APIS("poll()", "MessageListener")); + throw new ConsumerException(ExceptionMessage.MUTUALLY_EXCLUSIVE_APIS("poll()", "MessageListener"), new IllegalStateException()); } else if (!subscribed.get() && !assigned.get()) { - throw new ConsumerException(ExceptionMessage.NO_SUBSCRIPTION_ASSIGNMENT("poll()")); + throw new ConsumerException(ExceptionMessage.NO_SUBSCRIPTION_ASSIGNMENT("poll()"), new IllegalStateException()); } return internalPoll(pollTimeout); } finally { From 95ed3c8eebb282b54869d631519bcc8d3c001eeb Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Mon, 26 Jan 2026 16:19:17 -0500 Subject: [PATCH 4/7] Explicitly rethrow exception --- .../psc/source/reader/PscTopicUriPartitionSplitReader.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java index 025e21e..3dbfb87 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java @@ -120,7 +120,10 @@ public RecordsWithSplitIds> fetch() throws IO PscConsumerMessagesIterable.emptyIterable(), pscSourceReaderMetrics); markEmptySplitsAsFinished(recordsBySplits); return recordsBySplits; + } else { + throw new RuntimeException(e); } + } catch (Exception e) { throw new RuntimeException(e); } PscPartitionSplitRecords recordsBySplits = From 97095e7c2ce7c17ee3e68a8404a8f27b3300bbab Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Mon, 26 Jan 2026 16:55:54 -0500 Subject: [PATCH 5/7] Add log to debug exception throw --- .../psc/source/reader/PscTopicUriPartitionSplitReader.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java index 3dbfb87..0210498 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java @@ -115,15 +115,18 @@ public RecordsWithSplitIds> fetch() throws IO // record container, and this consumer will be closed by SplitFetcherManager. if (e.getCause() != null && (e.getCause().getClass().isAssignableFrom(IllegalStateException.class) || e.getCause().getClass().isAssignableFrom(WakeupException.class))) { + LOG.warn("Caught IllegalStateException or WakeupException in poll(), marking partitions as finished", e); PscPartitionSplitRecords recordsBySplits = new PscPartitionSplitRecords( PscConsumerMessagesIterable.emptyIterable(), pscSourceReaderMetrics); markEmptySplitsAsFinished(recordsBySplits); return recordsBySplits; } else { + LOG.error("Unrecoverable ConsumerException caught in poll()", e); throw new RuntimeException(e); } } catch (Exception e) { + LOG.error("Unrecoverable Exception caught in poll()", e); throw new RuntimeException(e); } PscPartitionSplitRecords recordsBySplits = From 8ca531473b7aeaf7d5c7c0b71942532ad35f22e5 Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Mon, 26 Jan 2026 17:17:06 -0500 Subject: [PATCH 6/7] Check for exception class equality --- .../psc/source/reader/PscTopicUriPartitionSplitReader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java index 0210498..4e8adc5 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java @@ -114,7 +114,7 @@ public RecordsWithSplitIds> fetch() throws IO // stopping offset). We just mark empty partitions as finished and return an empty // record container, and this consumer will be closed by SplitFetcherManager. if (e.getCause() != null && - (e.getCause().getClass().isAssignableFrom(IllegalStateException.class) || e.getCause().getClass().isAssignableFrom(WakeupException.class))) { + (e.getCause().getClass().equals(IllegalStateException.class) || e.getCause().getClass().equals(WakeupException.class))) { LOG.warn("Caught IllegalStateException or WakeupException in poll(), marking partitions as finished", e); PscPartitionSplitRecords recordsBySplits = new PscPartitionSplitRecords( From ce4e40bfb6da7df9c9c24ede4793be047ff37d5f Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Tue, 27 Jan 2026 13:18:08 -0500 Subject: [PATCH 7/7] Add unit tests for catching / throwing exceptions --- .../PscTopicUriPartitionSplitReaderTest.java | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReaderTest.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReaderTest.java index 275ccf1..4b4800d 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReaderTest.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReaderTest.java @@ -24,8 +24,10 @@ import com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub; import com.pinterest.psc.common.TopicUriPartition; import com.pinterest.psc.config.PscConfiguration; +import com.pinterest.psc.consumer.PscConsumer; import com.pinterest.psc.consumer.PscConsumerMessage; import com.pinterest.psc.exception.ClientException; +import com.pinterest.psc.exception.consumer.ConsumerException; import com.pinterest.psc.exception.consumer.DeserializerException; import com.pinterest.psc.exception.startup.ConfigurationException; import com.pinterest.psc.serde.ByteArrayDeserializer; @@ -46,6 +48,7 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.NoOffsetForPartitionException; +import org.apache.kafka.common.KafkaException; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; @@ -56,7 +59,9 @@ import org.junit.jupiter.params.provider.NullAndEmptySource; import org.junit.jupiter.params.provider.ValueSource; +import java.lang.reflect.Field; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -73,6 +78,9 @@ import static com.pinterest.flink.connector.psc.testutils.PscSourceTestEnv.NUM_RECORDS_PER_PARTITION; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** Unit tests for {@link PscTopicUriPartitionSplitReader}. */ public class PscTopicUriPartitionSplitReaderTest { @@ -161,6 +169,44 @@ public void testWakeupThenAssign() throws IOException, ConfigurationException, C new PscTopicUriPartitionSplit(tp, PscTopicUriPartitionSplit.EARLIEST_OFFSET))); } + @Test + public void testFetchThrowsOnNonWakeupConsumerException() throws Exception { + PscTopicUriPartitionSplitReader reader = createReader(); + @SuppressWarnings("unchecked") + PscConsumer mockConsumer = (PscConsumer) mock(PscConsumer.class); + when(mockConsumer.poll(any(Duration.class))) + .thenThrow(new ConsumerException(new KafkaException("boom"))); + + Field consumerField = PscTopicUriPartitionSplitReader.class.getDeclaredField("consumer"); + consumerField.setAccessible(true); + consumerField.set(reader, mockConsumer); + + assertThatThrownBy(reader::fetch) + .isInstanceOf(RuntimeException.class) + .hasCauseInstanceOf(ConsumerException.class) + .hasRootCauseInstanceOf(KafkaException.class); + } + + @Test + public void testFetchThrowsOnKafkaExceptionWithOomCause() throws Exception { + PscTopicUriPartitionSplitReader reader = createReader(); + @SuppressWarnings("unchecked") + PscConsumer mockConsumer = (PscConsumer) mock(PscConsumer.class); + KafkaException kafkaException = + new KafkaException("Failed to load record batch", new OutOfMemoryError("Java heap space")); + when(mockConsumer.poll(any(Duration.class))) + .thenThrow(new ConsumerException(kafkaException)); + + Field consumerField = PscTopicUriPartitionSplitReader.class.getDeclaredField("consumer"); + consumerField.setAccessible(true); + consumerField.set(reader, mockConsumer); + + assertThatThrownBy(reader::fetch) + .isInstanceOf(RuntimeException.class) + .hasCauseInstanceOf(ConsumerException.class) + .hasRootCauseInstanceOf(OutOfMemoryError.class); + } + @Test public void testNumBytesInCounter() throws Exception { final OperatorMetricGroup operatorMetricGroup =