diff --git a/pom.xml b/pom.xml
index 4ce36e73..52ba8d1c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,7 +7,7 @@
psc-java-oss
PubSub Client (PSC)
https://github.com/pinterest/psc
- 4.1.4
+ 4.1.5-SNAPSHOT
pom
psc-java-oss
diff --git a/psc-common/pom.xml b/psc-common/pom.xml
index 9f637ffa..280c9fab 100644
--- a/psc-common/pom.xml
+++ b/psc-common/pom.xml
@@ -5,7 +5,7 @@
psc-java-oss
com.pinterest.psc
- 4.1.4
+ 4.1.5-SNAPSHOT
../pom.xml
4.0.0
diff --git a/psc-examples/pom.xml b/psc-examples/pom.xml
index ff201f8f..815f2a28 100644
--- a/psc-examples/pom.xml
+++ b/psc-examples/pom.xml
@@ -5,13 +5,13 @@
psc-java-oss
com.pinterest.psc
- 4.1.4
+ 4.1.5-SNAPSHOT
../pom.xml
4.0.0
psc-examples
- 4.1.4
+ 4.1.5-SNAPSHOT
psc-examples
diff --git a/psc-flink-logging/pom.xml b/psc-flink-logging/pom.xml
index a4300147..372df674 100644
--- a/psc-flink-logging/pom.xml
+++ b/psc-flink-logging/pom.xml
@@ -5,13 +5,13 @@
psc-java-oss
com.pinterest.psc
- 4.1.4
+ 4.1.5-SNAPSHOT
../pom.xml
4.0.0
psc-flink-logging
- 4.1.4
+ 4.1.5-SNAPSHOT
diff --git a/psc-flink/pom.xml b/psc-flink/pom.xml
index d76eaaaa..f79e060f 100644
--- a/psc-flink/pom.xml
+++ b/psc-flink/pom.xml
@@ -5,7 +5,7 @@
com.pinterest.psc
psc-java-oss
- 4.1.4
+ 4.1.5-SNAPSHOT
../pom.xml
psc-flink
diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java
index e6e6f77a..4e8adc54 100644
--- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java
+++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java
@@ -113,11 +113,21 @@ public RecordsWithSplitIds> fetch() throws IO
// This happens if all assigned partitions are invalid or empty (starting offset >=
// stopping offset). We just mark empty partitions as finished and return an empty
// record container, and this consumer will be closed by SplitFetcherManager.
- PscPartitionSplitRecords recordsBySplits =
- new PscPartitionSplitRecords(
- PscConsumerMessagesIterable.emptyIterable(), pscSourceReaderMetrics);
- markEmptySplitsAsFinished(recordsBySplits);
- return recordsBySplits;
+ if (e.getCause() != null &&
+ (e.getCause().getClass().equals(IllegalStateException.class) || e.getCause().getClass().equals(WakeupException.class))) {
+ LOG.warn("Caught IllegalStateException or WakeupException in poll(), marking partitions as finished", e);
+ PscPartitionSplitRecords recordsBySplits =
+ new PscPartitionSplitRecords(
+ PscConsumerMessagesIterable.emptyIterable(), pscSourceReaderMetrics);
+ markEmptySplitsAsFinished(recordsBySplits);
+ return recordsBySplits;
+ } else {
+ LOG.error("Unrecoverable ConsumerException caught in poll()", e);
+ throw new RuntimeException(e);
+ }
+ } catch (Exception e) {
+ LOG.error("Unrecoverable Exception caught in poll()", e);
+ throw new RuntimeException(e);
}
PscPartitionSplitRecords recordsBySplits =
new PscPartitionSplitRecords(consumerMessagesIterable, pscSourceReaderMetrics);
diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReaderTest.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReaderTest.java
index 275ccf13..4b4800d7 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReaderTest.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReaderTest.java
@@ -24,8 +24,10 @@
import com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub;
import com.pinterest.psc.common.TopicUriPartition;
import com.pinterest.psc.config.PscConfiguration;
+import com.pinterest.psc.consumer.PscConsumer;
import com.pinterest.psc.consumer.PscConsumerMessage;
import com.pinterest.psc.exception.ClientException;
+import com.pinterest.psc.exception.consumer.ConsumerException;
import com.pinterest.psc.exception.consumer.DeserializerException;
import com.pinterest.psc.exception.startup.ConfigurationException;
import com.pinterest.psc.serde.ByteArrayDeserializer;
@@ -46,6 +48,7 @@
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
+import org.apache.kafka.common.KafkaException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
@@ -56,7 +59,9 @@
import org.junit.jupiter.params.provider.NullAndEmptySource;
import org.junit.jupiter.params.provider.ValueSource;
+import java.lang.reflect.Field;
import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -73,6 +78,9 @@
import static com.pinterest.flink.connector.psc.testutils.PscSourceTestEnv.NUM_RECORDS_PER_PARTITION;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/** Unit tests for {@link PscTopicUriPartitionSplitReader}. */
public class PscTopicUriPartitionSplitReaderTest {
@@ -161,6 +169,44 @@ public void testWakeupThenAssign() throws IOException, ConfigurationException, C
new PscTopicUriPartitionSplit(tp, PscTopicUriPartitionSplit.EARLIEST_OFFSET)));
}
+ @Test
+ public void testFetchThrowsOnNonWakeupConsumerException() throws Exception {
+ PscTopicUriPartitionSplitReader reader = createReader();
+ @SuppressWarnings("unchecked")
+ PscConsumer mockConsumer = (PscConsumer) mock(PscConsumer.class);
+ when(mockConsumer.poll(any(Duration.class)))
+ .thenThrow(new ConsumerException(new KafkaException("boom")));
+
+ Field consumerField = PscTopicUriPartitionSplitReader.class.getDeclaredField("consumer");
+ consumerField.setAccessible(true);
+ consumerField.set(reader, mockConsumer);
+
+ assertThatThrownBy(reader::fetch)
+ .isInstanceOf(RuntimeException.class)
+ .hasCauseInstanceOf(ConsumerException.class)
+ .hasRootCauseInstanceOf(KafkaException.class);
+ }
+
+ @Test
+ public void testFetchThrowsOnKafkaExceptionWithOomCause() throws Exception {
+ PscTopicUriPartitionSplitReader reader = createReader();
+ @SuppressWarnings("unchecked")
+ PscConsumer mockConsumer = (PscConsumer) mock(PscConsumer.class);
+ KafkaException kafkaException =
+ new KafkaException("Failed to load record batch", new OutOfMemoryError("Java heap space"));
+ when(mockConsumer.poll(any(Duration.class)))
+ .thenThrow(new ConsumerException(kafkaException));
+
+ Field consumerField = PscTopicUriPartitionSplitReader.class.getDeclaredField("consumer");
+ consumerField.setAccessible(true);
+ consumerField.set(reader, mockConsumer);
+
+ assertThatThrownBy(reader::fetch)
+ .isInstanceOf(RuntimeException.class)
+ .hasCauseInstanceOf(ConsumerException.class)
+ .hasRootCauseInstanceOf(OutOfMemoryError.class);
+ }
+
@Test
public void testNumBytesInCounter() throws Exception {
final OperatorMetricGroup operatorMetricGroup =
diff --git a/psc-integration-test/pom.xml b/psc-integration-test/pom.xml
index 3b125209..b3b727f2 100644
--- a/psc-integration-test/pom.xml
+++ b/psc-integration-test/pom.xml
@@ -5,7 +5,7 @@
psc-java-oss
com.pinterest.psc
- 4.1.4
+ 4.1.5-SNAPSHOT
../pom.xml
4.0.0
diff --git a/psc-logging/pom.xml b/psc-logging/pom.xml
index 398b0f28..df172b00 100644
--- a/psc-logging/pom.xml
+++ b/psc-logging/pom.xml
@@ -5,7 +5,7 @@
psc-java-oss
com.pinterest.psc
- 4.1.4
+ 4.1.5-SNAPSHOT
../pom.xml
4.0.0
diff --git a/psc/pom.xml b/psc/pom.xml
index fb7054d1..647e26ea 100644
--- a/psc/pom.xml
+++ b/psc/pom.xml
@@ -5,7 +5,7 @@
psc-java-oss
com.pinterest.psc
- 4.1.4
+ 4.1.5-SNAPSHOT
../pom.xml
4.0.0
diff --git a/psc/src/main/java/com/pinterest/psc/consumer/PscConsumer.java b/psc/src/main/java/com/pinterest/psc/consumer/PscConsumer.java
index 1dc158a4..b27bd32e 100644
--- a/psc/src/main/java/com/pinterest/psc/consumer/PscConsumer.java
+++ b/psc/src/main/java/com/pinterest/psc/consumer/PscConsumer.java
@@ -702,9 +702,9 @@ public PscConsumerPollMessageIterator poll(Duration pollTimeout) throws Co
acquireAndEnsureOpen();
try {
if (messageListener != null) {
- throw new ConsumerException(ExceptionMessage.MUTUALLY_EXCLUSIVE_APIS("poll()", "MessageListener"));
+ throw new ConsumerException(ExceptionMessage.MUTUALLY_EXCLUSIVE_APIS("poll()", "MessageListener"), new IllegalStateException());
} else if (!subscribed.get() && !assigned.get()) {
- throw new ConsumerException(ExceptionMessage.NO_SUBSCRIPTION_ASSIGNMENT("poll()"));
+ throw new ConsumerException(ExceptionMessage.NO_SUBSCRIPTION_ASSIGNMENT("poll()"), new IllegalStateException());
}
return internalPoll(pollTimeout);
} finally {