Skip to content

Replace deprecated commitRecord method to support Kafka 4.0 #159

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void testAuthenticatedQueueManager() throws Exception {
assertNull(kafkaMessage.key());
assertEquals(Schema.OPTIONAL_BYTES_SCHEMA, kafkaMessage.valueSchema());

newConnectTask.commitRecord(kafkaMessage);
newConnectTask.commitRecord(kafkaMessage, null);
}

assertArrayEquals("hello".getBytes(), (byte[]) kafkaMessages.get(0).value());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public void testIfErrorsOccurWithPollRollbackAndContinues() throws Exception {
assertThat(exc).isNotNull();
assertThat(exc).isInstanceOf(RetriableException.class);
for (SourceRecord record : sourceRecords) {
connectTask.commitRecord(record);
connectTask.commitRecord(record, null);
}

assertThat(sourceRecords.size()).isEqualTo(0);
Expand Down Expand Up @@ -341,7 +341,7 @@ private static void pollCommitAndAssert(MQSourceTask connectTask, int recordsPro
try {
sourceRecords = connectTask.poll();
for (SourceRecord record : sourceRecords) {
connectTask.commitRecord(record);
connectTask.commitRecord(record, null);
}

} catch(Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void verifyJmsTextMessages() throws Exception {
assertNull(kafkaMessage.key());
assertNull(kafkaMessage.valueSchema());

connectTask.commitRecord(kafkaMessage);
connectTask.commitRecord(kafkaMessage, null);
}

assertEquals("hello", kafkaMessages.get(0).value());
Expand Down Expand Up @@ -163,7 +163,7 @@ public void verifyJmsJsonMessages() throws Exception {
final Map<?, ?> value = (Map<?, ?>) kafkaMessage.value();
assertEquals(Long.valueOf(i), value.get("i"));

connectTask.commitRecord(kafkaMessage);
connectTask.commitRecord(kafkaMessage, null);
}
}

Expand Down Expand Up @@ -192,7 +192,7 @@ public void verifyMQMessage() throws Exception {
assertEquals(received.getClass(), byte[].class);
assertEquals(new String((byte[]) received, StandardCharsets.UTF_8), sent);

connectTask.commitRecord(firstMsg);
connectTask.commitRecord(firstMsg, null);
connectTask.poll();
}

Expand Down Expand Up @@ -227,7 +227,7 @@ public void verifyJmsMessageHeaders() throws Exception {
assertEquals("11", kafkaMessage.headers().lastWithName("volume").value());
assertEquals("42.0", kafkaMessage.headers().lastWithName("decimalmeaning").value());

connectTask.commitRecord(kafkaMessage);
connectTask.commitRecord(kafkaMessage, null);
}

@Test
Expand All @@ -254,28 +254,28 @@ public void verifyMessageBatchIndividualCommits() throws Exception {
assertEquals(10, kafkaMessages.size());
for (final SourceRecord kafkaMessage : kafkaMessages) {
assertEquals("batch message " + (nextExpectedMessage++), kafkaMessage.value());
connectTask.commitRecord(kafkaMessage);
connectTask.commitRecord(kafkaMessage, null);
}

kafkaMessages = connectTask.poll();
assertEquals(10, kafkaMessages.size());
for (final SourceRecord kafkaMessage : kafkaMessages) {
assertEquals("batch message " + (nextExpectedMessage++), kafkaMessage.value());
connectTask.commitRecord(kafkaMessage);
connectTask.commitRecord(kafkaMessage, null);
}

kafkaMessages = connectTask.poll();
assertEquals(10, kafkaMessages.size());
for (final SourceRecord kafkaMessage : kafkaMessages) {
assertEquals("batch message " + (nextExpectedMessage++), kafkaMessage.value());
connectTask.commitRecord(kafkaMessage);
connectTask.commitRecord(kafkaMessage, null);
}

kafkaMessages = connectTask.poll();
assertEquals(5, kafkaMessages.size());
for (final SourceRecord kafkaMessage : kafkaMessages) {
assertEquals("batch message " + (nextExpectedMessage++), kafkaMessage.value());
connectTask.commitRecord(kafkaMessage);
connectTask.commitRecord(kafkaMessage, null);
}
}

Expand All @@ -299,25 +299,25 @@ public void verifyMessageBatchGroupCommits() throws Exception {
kafkaMessages = connectTask.poll();
assertEquals(10, kafkaMessages.size());
for (final SourceRecord m : kafkaMessages) {
connectTask.commitRecord(m);
connectTask.commitRecord(m, null);
}

kafkaMessages = connectTask.poll();
assertEquals(10, kafkaMessages.size());
for (final SourceRecord m : kafkaMessages) {
connectTask.commitRecord(m);
connectTask.commitRecord(m, null);
}

kafkaMessages = connectTask.poll();
assertEquals(10, kafkaMessages.size());
for (final SourceRecord m : kafkaMessages) {
connectTask.commitRecord(m);
connectTask.commitRecord(m, null);
}

kafkaMessages = connectTask.poll();
assertEquals(5, kafkaMessages.size());
for (final SourceRecord m : kafkaMessages) {
connectTask.commitRecord(m);
connectTask.commitRecord(m, null);
}
}

Expand Down Expand Up @@ -346,7 +346,7 @@ public void verifyMessageIdAsKey() throws Exception {

assertEquals("testmessage", kafkaMessage.value());

connectTask.commitRecord(kafkaMessage);
connectTask.commitRecord(kafkaMessage, null);
}

@Test
Expand Down Expand Up @@ -374,13 +374,13 @@ public void verifyCorrelationIdAsKey() throws Exception {
assertEquals("verifycorrel", kafkaMessage1.key());
assertEquals(Schema.OPTIONAL_STRING_SCHEMA, kafkaMessage1.keySchema());
assertEquals("first message", kafkaMessage1.value());
connectTask.commitRecord(kafkaMessage1);
connectTask.commitRecord(kafkaMessage1, null);

final SourceRecord kafkaMessage2 = kafkaMessages.get(1);
assertEquals("5fb4a18030154fe4b09a1dfe8075bc101dfe8075bc104fe4", kafkaMessage2.key());
assertEquals(Schema.OPTIONAL_STRING_SCHEMA, kafkaMessage2.keySchema());
assertEquals("second message", kafkaMessage2.value());
connectTask.commitRecord(kafkaMessage2);
connectTask.commitRecord(kafkaMessage2, null);
}

@Test
Expand Down Expand Up @@ -408,7 +408,7 @@ public void verifyCorrelationIdBytesAsKey() throws Exception {

assertEquals("testmessagewithcorrelbytes", kafkaMessage.value());

connectTask.commitRecord(kafkaMessage);
connectTask.commitRecord(kafkaMessage, null);
}

@Test
Expand All @@ -435,7 +435,7 @@ public void verifyDestinationAsKey() throws Exception {

assertEquals("testmessagewithdest", kafkaMessage.value());

connectTask.commitRecord(kafkaMessage);
connectTask.commitRecord(kafkaMessage, null);
}

@Test
Expand Down Expand Up @@ -467,7 +467,7 @@ public void testSequenceStateMsgReadUnderMQTx() throws Exception {
assertThat(stateMsgs1.size()).isEqualTo(1);

for (final SourceRecord m : kafkaMessages) {
connectTask.commitRecord(m);
connectTask.commitRecord(m, null);
}

/// make commit do rollback when poll is called
Expand Down Expand Up @@ -651,7 +651,7 @@ public void verifyEmptyMessage() throws Exception {
final SourceRecord kafkaMessage = kafkaMessages.get(0);
assertNull(kafkaMessage.value());

connectTask.commitRecord(kafkaMessage);
connectTask.commitRecord(kafkaMessage, null);
}

@Test
Expand All @@ -674,7 +674,7 @@ public void verifyEmptyTextMessage() throws Exception {
final SourceRecord kafkaMessage = kafkaMessages.get(0);
assertNull(kafkaMessage.value());

connectTask.commitRecord(kafkaMessage);
connectTask.commitRecord(kafkaMessage, null);
}

@Test
Expand Down Expand Up @@ -752,7 +752,7 @@ public void verifyErrorToleranceMessages() throws Exception {
final Map<?, ?> value = (Map<?, ?>) validRecord.value();
assertThat(value.get("i")).isEqualTo(Long.valueOf(i));

connectTask.commitRecord(validRecord);
connectTask.commitRecord(validRecord, null);
}
}

Expand Down Expand Up @@ -797,7 +797,7 @@ public void shouldRoutePoisonMessagesToDeadLetterQueueWhenErrorToleranceIsAll()
final Map<?, ?> value = (Map<?, ?>) validRecord.value();
assertThat(value.get("i")).isEqualTo(Long.valueOf(i - 1));

connectTask.commitRecord(validRecord);
connectTask.commitRecord(validRecord, null);
}
}

Expand Down Expand Up @@ -876,7 +876,7 @@ public void shouldPreserveDlqHeadersWithErrorInformation() throws Exception {
assertEquals(headers.lastWithName("__connect.errors.jms.message.id").value(), message.getJMSMessageID());
assertEquals(headers.lastWithName("__connect.errors.jms.timestamp").value(), message.getJMSTimestamp());
assertEquals(headers.lastWithName("__connect.errors.mq.queue").value(), DEFAULT_SOURCE_QUEUE);
connectTask.commitRecord(dlqRecord);
connectTask.commitRecord(dlqRecord, null);
}

@Test
Expand Down Expand Up @@ -910,7 +910,7 @@ public void shouldHandleDifferentMessageTypesToDlq() throws Exception {
for (final SourceRecord dlqRecord : processedRecords) {
assertThat(dlqRecord.topic()).isEqualTo("__dlq.mq.source");
assertThat(dlqRecord.valueSchema().type()).isEqualTo(Schema.Type.BYTES);
connectTask.commitRecord(dlqRecord);
connectTask.commitRecord(dlqRecord, null);
}
}

Expand Down Expand Up @@ -943,7 +943,7 @@ public void shouldPreserveJmsPropertiesInDlqMessages() throws Exception {
assertThat(headers.lastWithName("__connect.errors.exception.message").value())
.isEqualTo("Converting byte[] to Kafka Connect data failed due to serialization error: ");

connectTask.commitRecord(dlqRecord);
connectTask.commitRecord(dlqRecord, null);
}

@Test
Expand Down Expand Up @@ -984,7 +984,7 @@ public void shouldHandleMixOfValidAndInvalidMessagesWithDifferentFormats() throw
validCount++;
assertThat(record.topic()).isEqualTo("mytopic");
}
connectTask.commitRecord(record);
connectTask.commitRecord(record, null);
}

assertThat(validCount).isEqualTo(3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void testPollGetSequenceId_GivenSequenceStateIsPresentOnQueue() throws Ex
final List<SourceRecord> kafkaMessages = connectTask.poll();

for (final SourceRecord kafkaMessage : kafkaMessages) {
connectTask.commitRecord(kafkaMessage);
connectTask.commitRecord(kafkaMessage, null);
}

assertThat(kafkaMessages)
Expand All @@ -145,7 +145,7 @@ public void testPollGetsSequenceId_GivenSequenceStateIsNotPresentOnQueue() throw
final List<SourceRecord> kafkaMessages = connectTask.poll();

for (final SourceRecord kafkaMessage : kafkaMessages) {
connectTask.commitRecord(kafkaMessage);
connectTask.commitRecord(kafkaMessage, null);
}

assertThat(kafkaMessages)
Expand All @@ -166,7 +166,7 @@ public void testPollEndsWithPutSequenceStateOnQueue_GivenMessagesHaveBeenReceive
final List<SourceRecord> kafkaMessages = connectTask.poll();

for (final SourceRecord kafkaMessage : kafkaMessages) {
connectTask.commitRecord(kafkaMessage);
connectTask.commitRecord(kafkaMessage, null);
}

assertThat(getSequenceStateAndAssertNotEmpty()).isEqualTo(new SequenceState(
Expand Down Expand Up @@ -215,7 +215,7 @@ private void pollAndAssert(int expectedBatchSize, long sequenceId) throws Except
.contains(sequenceId);

for (final SourceRecord m : kafkaMessages) {
connectTask.commitRecord(m);
connectTask.commitRecord(m, null);
}

assertThat(getSequenceStateAndAssertNotEmpty()).isEqualTo(new SequenceState(
Expand Down Expand Up @@ -246,7 +246,7 @@ public void testSourceOffset() throws Exception {
final List<SourceRecord> sourceRecords = connectTask.poll();

for (final SourceRecord sourceRecord : sourceRecords) {
connectTask.commitRecord(sourceRecord);
connectTask.commitRecord(sourceRecord, null);
}

assertThat(sourceRecords)
Expand Down Expand Up @@ -275,7 +275,7 @@ public void test_IfSequenceStateIsOnStateQueue_ThenActionIsREDELIVER_UNSENT_BATC
final List<SourceRecord> kafkaMessages = connectTask.poll();

for (final SourceRecord kafkaMessage : kafkaMessages) {
connectTask.commitRecord(kafkaMessage);
connectTask.commitRecord(kafkaMessage, null);
}

assertThat(kafkaMessages)
Expand All @@ -298,7 +298,7 @@ public void testConnectorFirstTimeRunStart_ActionIsStandardGet_AndStateNotInKafk
final List<SourceRecord> kafkaMessages = connectTask.poll();

for (final SourceRecord kafkaMessage : kafkaMessages) {
connectTask.commitRecord(kafkaMessage);
connectTask.commitRecord(kafkaMessage, null);
}

assertThat(kafkaMessages)
Expand Down Expand Up @@ -329,7 +329,7 @@ public void testOnlyOnceDisabled_NoStateSaved_AndSequenceIdIncrements() throws E
kafkaMessages = connectTask.poll();

for (final SourceRecord kafkaMessage : kafkaMessages) {
connectTask.commitRecord(kafkaMessage);
connectTask.commitRecord(kafkaMessage, null);
}

assertThat(kafkaMessages)
Expand All @@ -349,7 +349,7 @@ public void testOnlyOnceDisabled_NoStateSaved_AndSequenceIdIncrements() throws E
kafkaMessages = connectTask.poll();

for (final SourceRecord kafkaMessage : kafkaMessages) {
connectTask.commitRecord(kafkaMessage);
connectTask.commitRecord(kafkaMessage, null);
}

assertThat(kafkaMessages)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void testOnlyOnceStartBehaviour_GivenNoSequenceStateIsPresentOnQueueOrKaf
final List<SourceRecord> kafkaMessages = connectTask.poll();

for (final SourceRecord kafkaMessage : kafkaMessages) {
connectTask.commitRecord(kafkaMessage);
connectTask.commitRecord(kafkaMessage, null);
}

// Check that MQ has been read from
Expand Down Expand Up @@ -169,7 +169,7 @@ public void testOnlyOnceStartBehaviour_GivenNoSequenceStateIsPresentOnQueueButOf
final List<SourceRecord> kafkaMessages = connectTask.poll();

for (final SourceRecord kafkaMessage : kafkaMessages) {
connectTask.commitRecord(kafkaMessage);
connectTask.commitRecord(kafkaMessage, null);
}

// Check that MQ has been read from
Expand Down Expand Up @@ -500,7 +500,7 @@ public void testOnlyOnceStartBehaviour_CrashAfterKafkaCommitBeforeMQCommit() thr
assertThat(kafkaMessagesRoundOne.size()).isEqualTo(2);

for (final SourceRecord kafkaMessage : kafkaMessagesRoundOne) {
connectTask.commitRecord(kafkaMessage);
connectTask.commitRecord(kafkaMessage, null);
}

assertThat(kafkaMessagesRoundOne)
Expand All @@ -518,7 +518,7 @@ public void testOnlyOnceStartBehaviour_CrashAfterKafkaCommitBeforeMQCommit() thr
assertThat(kafkaMessagesRoundTwo.size()).isEqualTo(2);

for (final SourceRecord kafkaMessage : kafkaMessagesRoundTwo) {
connectTask.commitRecord(kafkaMessage);
connectTask.commitRecord(kafkaMessage, null);
}

assertThat(kafkaMessagesRoundTwo)
Expand All @@ -540,7 +540,7 @@ public void testOnlyOnceStartBehaviour_CrashAfterKafkaCommitBeforeMQCommit() thr
final List<SourceRecord> kafkaMessagesRoundThree = connectTask.poll();

for (final SourceRecord kafkaMessage : kafkaMessagesRoundThree) {
connectTask.commitRecord(kafkaMessage);
connectTask.commitRecord(kafkaMessage, null);
}

// These messages would have been returned if onlyOnce wasn't implemented.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.ibm.eventstreams.connect.mqsource.util.LogMessages;
import com.ibm.eventstreams.connect.mqsource.util.ExceptionProcessor;
import com.ibm.eventstreams.connect.mqsource.util.QueueConfig;

import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
Expand Down Expand Up @@ -533,7 +535,7 @@ public void stop() {
* @throws InterruptedException
*/
@Override
public void commitRecord(final SourceRecord record) throws InterruptedException {
public void commitRecord(final SourceRecord record, final RecordMetadata metadata) throws InterruptedException {
log.trace("[{}] Entry {}.commitRecord, record={}", Thread.currentThread().getId(), this.getClass().getName(),
record);

Expand Down
Loading