diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskAuthIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskAuthIT.java index b85353a..29d2ad0 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskAuthIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskAuthIT.java @@ -110,7 +110,7 @@ public void testAuthenticatedQueueManager() throws Exception { assertNull(kafkaMessage.key()); assertEquals(Schema.OPTIONAL_BYTES_SCHEMA, kafkaMessage.valueSchema()); - newConnectTask.commitRecord(kafkaMessage); + newConnectTask.commitRecord(kafkaMessage, null); } assertArrayEquals("hello".getBytes(), (byte[]) kafkaMessages.get(0).value()); diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskExceptionHandlingIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskExceptionHandlingIT.java index 9c38172..cd085c1 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskExceptionHandlingIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskExceptionHandlingIT.java @@ -221,7 +221,7 @@ public void testIfErrorsOccurWithPollRollbackAndContinues() throws Exception { assertThat(exc).isNotNull(); assertThat(exc).isInstanceOf(RetriableException.class); for (SourceRecord record : sourceRecords) { - connectTask.commitRecord(record); + connectTask.commitRecord(record, null); } assertThat(sourceRecords.size()).isEqualTo(0); @@ -341,7 +341,7 @@ private static void pollCommitAndAssert(MQSourceTask connectTask, int recordsPro try { sourceRecords = connectTask.poll(); for (SourceRecord record : sourceRecords) { - connectTask.commitRecord(record); + connectTask.commitRecord(record, null); } } catch(Exception e) { diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java index 63efbf0..7501f25 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java @@ -126,7 +126,7 @@ public void verifyJmsTextMessages() throws Exception { assertNull(kafkaMessage.key()); assertNull(kafkaMessage.valueSchema()); - connectTask.commitRecord(kafkaMessage); + connectTask.commitRecord(kafkaMessage, null); } assertEquals("hello", kafkaMessages.get(0).value()); @@ -163,7 +163,7 @@ public void verifyJmsJsonMessages() throws Exception { final Map value = (Map) kafkaMessage.value(); assertEquals(Long.valueOf(i), value.get("i")); - connectTask.commitRecord(kafkaMessage); + connectTask.commitRecord(kafkaMessage, null); } } @@ -192,7 +192,7 @@ public void verifyMQMessage() throws Exception { assertEquals(received.getClass(), byte[].class); assertEquals(new String((byte[]) received, StandardCharsets.UTF_8), sent); - connectTask.commitRecord(firstMsg); + connectTask.commitRecord(firstMsg, null); connectTask.poll(); } @@ -227,7 +227,7 @@ public void verifyJmsMessageHeaders() throws Exception { assertEquals("11", kafkaMessage.headers().lastWithName("volume").value()); assertEquals("42.0", kafkaMessage.headers().lastWithName("decimalmeaning").value()); - connectTask.commitRecord(kafkaMessage); + connectTask.commitRecord(kafkaMessage, null); } @Test @@ -254,28 +254,28 @@ public void verifyMessageBatchIndividualCommits() throws Exception { assertEquals(10, kafkaMessages.size()); for (final SourceRecord kafkaMessage : kafkaMessages) { assertEquals("batch message " + (nextExpectedMessage++), kafkaMessage.value()); - connectTask.commitRecord(kafkaMessage); + connectTask.commitRecord(kafkaMessage, null); } kafkaMessages = connectTask.poll(); assertEquals(10, kafkaMessages.size()); for (final SourceRecord kafkaMessage : kafkaMessages) { assertEquals("batch message " + (nextExpectedMessage++), kafkaMessage.value()); - connectTask.commitRecord(kafkaMessage); + connectTask.commitRecord(kafkaMessage, null); } kafkaMessages = connectTask.poll(); assertEquals(10, kafkaMessages.size()); for (final SourceRecord kafkaMessage : kafkaMessages) { assertEquals("batch message " + (nextExpectedMessage++), kafkaMessage.value()); - connectTask.commitRecord(kafkaMessage); + connectTask.commitRecord(kafkaMessage, null); } kafkaMessages = connectTask.poll(); assertEquals(5, kafkaMessages.size()); for (final SourceRecord kafkaMessage : kafkaMessages) { assertEquals("batch message " + (nextExpectedMessage++), kafkaMessage.value()); - connectTask.commitRecord(kafkaMessage); + connectTask.commitRecord(kafkaMessage, null); } } @@ -299,25 +299,25 @@ public void verifyMessageBatchGroupCommits() throws Exception { kafkaMessages = connectTask.poll(); assertEquals(10, kafkaMessages.size()); for (final SourceRecord m : kafkaMessages) { - connectTask.commitRecord(m); + connectTask.commitRecord(m, null); } kafkaMessages = connectTask.poll(); assertEquals(10, kafkaMessages.size()); for (final SourceRecord m : kafkaMessages) { - connectTask.commitRecord(m); + connectTask.commitRecord(m, null); } kafkaMessages = connectTask.poll(); assertEquals(10, kafkaMessages.size()); for (final SourceRecord m : kafkaMessages) { - connectTask.commitRecord(m); + connectTask.commitRecord(m, null); } kafkaMessages = connectTask.poll(); assertEquals(5, kafkaMessages.size()); for (final SourceRecord m : kafkaMessages) { - connectTask.commitRecord(m); + connectTask.commitRecord(m, null); } } @@ -346,7 +346,7 @@ public void verifyMessageIdAsKey() throws Exception { assertEquals("testmessage", kafkaMessage.value()); - connectTask.commitRecord(kafkaMessage); + connectTask.commitRecord(kafkaMessage, null); } @Test @@ -374,13 +374,13 @@ public void verifyCorrelationIdAsKey() throws Exception { assertEquals("verifycorrel", kafkaMessage1.key()); assertEquals(Schema.OPTIONAL_STRING_SCHEMA, kafkaMessage1.keySchema()); assertEquals("first message", kafkaMessage1.value()); - connectTask.commitRecord(kafkaMessage1); + connectTask.commitRecord(kafkaMessage1, null); final SourceRecord kafkaMessage2 = kafkaMessages.get(1); assertEquals("5fb4a18030154fe4b09a1dfe8075bc101dfe8075bc104fe4", kafkaMessage2.key()); assertEquals(Schema.OPTIONAL_STRING_SCHEMA, kafkaMessage2.keySchema()); assertEquals("second message", kafkaMessage2.value()); - connectTask.commitRecord(kafkaMessage2); + connectTask.commitRecord(kafkaMessage2, null); } @Test @@ -408,7 +408,7 @@ public void verifyCorrelationIdBytesAsKey() throws Exception { assertEquals("testmessagewithcorrelbytes", kafkaMessage.value()); - connectTask.commitRecord(kafkaMessage); + connectTask.commitRecord(kafkaMessage, null); } @Test @@ -435,7 +435,7 @@ public void verifyDestinationAsKey() throws Exception { assertEquals("testmessagewithdest", kafkaMessage.value()); - connectTask.commitRecord(kafkaMessage); + connectTask.commitRecord(kafkaMessage, null); } @Test @@ -467,7 +467,7 @@ public void testSequenceStateMsgReadUnderMQTx() throws Exception { assertThat(stateMsgs1.size()).isEqualTo(1); for (final SourceRecord m : kafkaMessages) { - connectTask.commitRecord(m); + connectTask.commitRecord(m, null); } /// make commit do rollback when poll is called @@ -651,7 +651,7 @@ public void verifyEmptyMessage() throws Exception { final SourceRecord kafkaMessage = kafkaMessages.get(0); assertNull(kafkaMessage.value()); - connectTask.commitRecord(kafkaMessage); + connectTask.commitRecord(kafkaMessage, null); } @Test @@ -674,7 +674,7 @@ public void verifyEmptyTextMessage() throws Exception { final SourceRecord kafkaMessage = kafkaMessages.get(0); assertNull(kafkaMessage.value()); - connectTask.commitRecord(kafkaMessage); + connectTask.commitRecord(kafkaMessage, null); } @Test @@ -752,7 +752,7 @@ public void verifyErrorToleranceMessages() throws Exception { final Map value = (Map) validRecord.value(); assertThat(value.get("i")).isEqualTo(Long.valueOf(i)); - connectTask.commitRecord(validRecord); + connectTask.commitRecord(validRecord, null); } } @@ -797,7 +797,7 @@ public void shouldRoutePoisonMessagesToDeadLetterQueueWhenErrorToleranceIsAll() final Map value = (Map) validRecord.value(); assertThat(value.get("i")).isEqualTo(Long.valueOf(i - 1)); - connectTask.commitRecord(validRecord); + connectTask.commitRecord(validRecord, null); } } @@ -876,7 +876,7 @@ public void shouldPreserveDlqHeadersWithErrorInformation() throws Exception { assertEquals(headers.lastWithName("__connect.errors.jms.message.id").value(), message.getJMSMessageID()); assertEquals(headers.lastWithName("__connect.errors.jms.timestamp").value(), message.getJMSTimestamp()); assertEquals(headers.lastWithName("__connect.errors.mq.queue").value(), DEFAULT_SOURCE_QUEUE); - connectTask.commitRecord(dlqRecord); + connectTask.commitRecord(dlqRecord, null); } @Test @@ -910,7 +910,7 @@ public void shouldHandleDifferentMessageTypesToDlq() throws Exception { for (final SourceRecord dlqRecord : processedRecords) { assertThat(dlqRecord.topic()).isEqualTo("__dlq.mq.source"); assertThat(dlqRecord.valueSchema().type()).isEqualTo(Schema.Type.BYTES); - connectTask.commitRecord(dlqRecord); + connectTask.commitRecord(dlqRecord, null); } } @@ -943,7 +943,7 @@ public void shouldPreserveJmsPropertiesInDlqMessages() throws Exception { assertThat(headers.lastWithName("__connect.errors.exception.message").value()) .isEqualTo("Converting byte[] to Kafka Connect data failed due to serialization error: "); - connectTask.commitRecord(dlqRecord); + connectTask.commitRecord(dlqRecord, null); } @Test @@ -984,7 +984,7 @@ public void shouldHandleMixOfValidAndInvalidMessagesWithDifferentFormats() throw validCount++; assertThat(record.topic()).isEqualTo("mytopic"); } - connectTask.commitRecord(record); + connectTask.commitRecord(record, null); } assertThat(validCount).isEqualTo(3); diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskOnlyOnceIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskOnlyOnceIT.java index 3630242..f35949e 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskOnlyOnceIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskOnlyOnceIT.java @@ -126,7 +126,7 @@ public void testPollGetSequenceId_GivenSequenceStateIsPresentOnQueue() throws Ex final List kafkaMessages = connectTask.poll(); for (final SourceRecord kafkaMessage : kafkaMessages) { - connectTask.commitRecord(kafkaMessage); + connectTask.commitRecord(kafkaMessage, null); } assertThat(kafkaMessages) @@ -145,7 +145,7 @@ public void testPollGetsSequenceId_GivenSequenceStateIsNotPresentOnQueue() throw final List kafkaMessages = connectTask.poll(); for (final SourceRecord kafkaMessage : kafkaMessages) { - connectTask.commitRecord(kafkaMessage); + connectTask.commitRecord(kafkaMessage, null); } assertThat(kafkaMessages) @@ -166,7 +166,7 @@ public void testPollEndsWithPutSequenceStateOnQueue_GivenMessagesHaveBeenReceive final List kafkaMessages = connectTask.poll(); for (final SourceRecord kafkaMessage : kafkaMessages) { - connectTask.commitRecord(kafkaMessage); + connectTask.commitRecord(kafkaMessage, null); } assertThat(getSequenceStateAndAssertNotEmpty()).isEqualTo(new SequenceState( @@ -215,7 +215,7 @@ private void pollAndAssert(int expectedBatchSize, long sequenceId) throws Except .contains(sequenceId); for (final SourceRecord m : kafkaMessages) { - connectTask.commitRecord(m); + connectTask.commitRecord(m, null); } assertThat(getSequenceStateAndAssertNotEmpty()).isEqualTo(new SequenceState( @@ -246,7 +246,7 @@ public void testSourceOffset() throws Exception { final List sourceRecords = connectTask.poll(); for (final SourceRecord sourceRecord : sourceRecords) { - connectTask.commitRecord(sourceRecord); + connectTask.commitRecord(sourceRecord, null); } assertThat(sourceRecords) @@ -275,7 +275,7 @@ public void test_IfSequenceStateIsOnStateQueue_ThenActionIsREDELIVER_UNSENT_BATC final List kafkaMessages = connectTask.poll(); for (final SourceRecord kafkaMessage : kafkaMessages) { - connectTask.commitRecord(kafkaMessage); + connectTask.commitRecord(kafkaMessage, null); } assertThat(kafkaMessages) @@ -298,7 +298,7 @@ public void testConnectorFirstTimeRunStart_ActionIsStandardGet_AndStateNotInKafk final List kafkaMessages = connectTask.poll(); for (final SourceRecord kafkaMessage : kafkaMessages) { - connectTask.commitRecord(kafkaMessage); + connectTask.commitRecord(kafkaMessage, null); } assertThat(kafkaMessages) @@ -329,7 +329,7 @@ public void testOnlyOnceDisabled_NoStateSaved_AndSequenceIdIncrements() throws E kafkaMessages = connectTask.poll(); for (final SourceRecord kafkaMessage : kafkaMessages) { - connectTask.commitRecord(kafkaMessage); + connectTask.commitRecord(kafkaMessage, null); } assertThat(kafkaMessages) @@ -349,7 +349,7 @@ public void testOnlyOnceDisabled_NoStateSaved_AndSequenceIdIncrements() throws E kafkaMessages = connectTask.poll(); for (final SourceRecord kafkaMessage : kafkaMessages) { - connectTask.commitRecord(kafkaMessage); + connectTask.commitRecord(kafkaMessage, null); } assertThat(kafkaMessages) diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskOnlyOnceStartBehaviourIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskOnlyOnceStartBehaviourIT.java index 992ea75..bb5ccbe 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskOnlyOnceStartBehaviourIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskOnlyOnceStartBehaviourIT.java @@ -128,7 +128,7 @@ public void testOnlyOnceStartBehaviour_GivenNoSequenceStateIsPresentOnQueueOrKaf final List kafkaMessages = connectTask.poll(); for (final SourceRecord kafkaMessage : kafkaMessages) { - connectTask.commitRecord(kafkaMessage); + connectTask.commitRecord(kafkaMessage, null); } // Check that MQ has been read from @@ -169,7 +169,7 @@ public void testOnlyOnceStartBehaviour_GivenNoSequenceStateIsPresentOnQueueButOf final List kafkaMessages = connectTask.poll(); for (final SourceRecord kafkaMessage : kafkaMessages) { - connectTask.commitRecord(kafkaMessage); + connectTask.commitRecord(kafkaMessage, null); } // Check that MQ has been read from @@ -500,7 +500,7 @@ public void testOnlyOnceStartBehaviour_CrashAfterKafkaCommitBeforeMQCommit() thr assertThat(kafkaMessagesRoundOne.size()).isEqualTo(2); for (final SourceRecord kafkaMessage : kafkaMessagesRoundOne) { - connectTask.commitRecord(kafkaMessage); + connectTask.commitRecord(kafkaMessage, null); } assertThat(kafkaMessagesRoundOne) @@ -518,7 +518,7 @@ public void testOnlyOnceStartBehaviour_CrashAfterKafkaCommitBeforeMQCommit() thr assertThat(kafkaMessagesRoundTwo.size()).isEqualTo(2); for (final SourceRecord kafkaMessage : kafkaMessagesRoundTwo) { - connectTask.commitRecord(kafkaMessage); + connectTask.commitRecord(kafkaMessage, null); } assertThat(kafkaMessagesRoundTwo) @@ -540,7 +540,7 @@ public void testOnlyOnceStartBehaviour_CrashAfterKafkaCommitBeforeMQCommit() thr final List kafkaMessagesRoundThree = connectTask.poll(); for (final SourceRecord kafkaMessage : kafkaMessagesRoundThree) { - connectTask.commitRecord(kafkaMessage); + connectTask.commitRecord(kafkaMessage, null); } // These messages would have been returned if onlyOnce wasn't implemented. diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java index 9a876d5..c9684cc 100755 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java @@ -22,6 +22,8 @@ import com.ibm.eventstreams.connect.mqsource.util.LogMessages; import com.ibm.eventstreams.connect.mqsource.util.ExceptionProcessor; import com.ibm.eventstreams.connect.mqsource.util.QueueConfig; + +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.SourceRecord; @@ -533,7 +535,7 @@ public void stop() { * @throws InterruptedException */ @Override - public void commitRecord(final SourceRecord record) throws InterruptedException { + public void commitRecord(final SourceRecord record, final RecordMetadata metadata) throws InterruptedException { log.trace("[{}] Entry {}.commitRecord, record={}", Thread.currentThread().getId(), this.getClass().getName(), record); diff --git a/src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskTest.java b/src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskTest.java index 378acbb..f740b3a 100644 --- a/src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskTest.java +++ b/src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskTest.java @@ -240,7 +240,7 @@ public void testPollsBlockUntilBatchComplete() throws JMSRuntimeException, JMSEx assertThat(pollDuringCommits).isNull(); } - mqSourceTask.commitRecord(firstConnectMessagesBatch.get(i)); + mqSourceTask.commitRecord(firstConnectMessagesBatch.get(i), null); } // now all messages are committed, a poll should return messages