Skip to content

Commit 8f16752

Browse files
Adil9645A S Adil Mohammad
andauthored
refactor: Replace deprecated commitRecord methods (#159)
Signed-off-by: A S Adil Mohammad <Adil.Mohammad@ibm.com> Co-authored-by: A S Adil Mohammad <Adil.Mohammad@ibm.com>
1 parent 66c4eb6 commit 8f16752

File tree

7 files changed

+47
-45
lines changed

7 files changed

+47
-45
lines changed

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskAuthIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public void testAuthenticatedQueueManager() throws Exception {
110110
assertNull(kafkaMessage.key());
111111
assertEquals(Schema.OPTIONAL_BYTES_SCHEMA, kafkaMessage.valueSchema());
112112

113-
newConnectTask.commitRecord(kafkaMessage);
113+
newConnectTask.commitRecord(kafkaMessage, null);
114114
}
115115

116116
assertArrayEquals("hello".getBytes(), (byte[]) kafkaMessages.get(0).value());

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskExceptionHandlingIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ public void testIfErrorsOccurWithPollRollbackAndContinues() throws Exception {
221221
assertThat(exc).isNotNull();
222222
assertThat(exc).isInstanceOf(RetriableException.class);
223223
for (SourceRecord record : sourceRecords) {
224-
connectTask.commitRecord(record);
224+
connectTask.commitRecord(record, null);
225225
}
226226

227227
assertThat(sourceRecords.size()).isEqualTo(0);
@@ -341,7 +341,7 @@ private static void pollCommitAndAssert(MQSourceTask connectTask, int recordsPro
341341
try {
342342
sourceRecords = connectTask.poll();
343343
for (SourceRecord record : sourceRecords) {
344-
connectTask.commitRecord(record);
344+
connectTask.commitRecord(record, null);
345345
}
346346

347347
} catch(Exception e) {

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public void verifyJmsTextMessages() throws Exception {
126126
assertNull(kafkaMessage.key());
127127
assertNull(kafkaMessage.valueSchema());
128128

129-
connectTask.commitRecord(kafkaMessage);
129+
connectTask.commitRecord(kafkaMessage, null);
130130
}
131131

132132
assertEquals("hello", kafkaMessages.get(0).value());
@@ -163,7 +163,7 @@ public void verifyJmsJsonMessages() throws Exception {
163163
final Map<?, ?> value = (Map<?, ?>) kafkaMessage.value();
164164
assertEquals(Long.valueOf(i), value.get("i"));
165165

166-
connectTask.commitRecord(kafkaMessage);
166+
connectTask.commitRecord(kafkaMessage, null);
167167
}
168168
}
169169

@@ -192,7 +192,7 @@ public void verifyMQMessage() throws Exception {
192192
assertEquals(received.getClass(), byte[].class);
193193
assertEquals(new String((byte[]) received, StandardCharsets.UTF_8), sent);
194194

195-
connectTask.commitRecord(firstMsg);
195+
connectTask.commitRecord(firstMsg, null);
196196
connectTask.poll();
197197
}
198198

@@ -227,7 +227,7 @@ public void verifyJmsMessageHeaders() throws Exception {
227227
assertEquals("11", kafkaMessage.headers().lastWithName("volume").value());
228228
assertEquals("42.0", kafkaMessage.headers().lastWithName("decimalmeaning").value());
229229

230-
connectTask.commitRecord(kafkaMessage);
230+
connectTask.commitRecord(kafkaMessage, null);
231231
}
232232

233233
@Test
@@ -254,28 +254,28 @@ public void verifyMessageBatchIndividualCommits() throws Exception {
254254
assertEquals(10, kafkaMessages.size());
255255
for (final SourceRecord kafkaMessage : kafkaMessages) {
256256
assertEquals("batch message " + (nextExpectedMessage++), kafkaMessage.value());
257-
connectTask.commitRecord(kafkaMessage);
257+
connectTask.commitRecord(kafkaMessage, null);
258258
}
259259

260260
kafkaMessages = connectTask.poll();
261261
assertEquals(10, kafkaMessages.size());
262262
for (final SourceRecord kafkaMessage : kafkaMessages) {
263263
assertEquals("batch message " + (nextExpectedMessage++), kafkaMessage.value());
264-
connectTask.commitRecord(kafkaMessage);
264+
connectTask.commitRecord(kafkaMessage, null);
265265
}
266266

267267
kafkaMessages = connectTask.poll();
268268
assertEquals(10, kafkaMessages.size());
269269
for (final SourceRecord kafkaMessage : kafkaMessages) {
270270
assertEquals("batch message " + (nextExpectedMessage++), kafkaMessage.value());
271-
connectTask.commitRecord(kafkaMessage);
271+
connectTask.commitRecord(kafkaMessage, null);
272272
}
273273

274274
kafkaMessages = connectTask.poll();
275275
assertEquals(5, kafkaMessages.size());
276276
for (final SourceRecord kafkaMessage : kafkaMessages) {
277277
assertEquals("batch message " + (nextExpectedMessage++), kafkaMessage.value());
278-
connectTask.commitRecord(kafkaMessage);
278+
connectTask.commitRecord(kafkaMessage, null);
279279
}
280280
}
281281

@@ -299,25 +299,25 @@ public void verifyMessageBatchGroupCommits() throws Exception {
299299
kafkaMessages = connectTask.poll();
300300
assertEquals(10, kafkaMessages.size());
301301
for (final SourceRecord m : kafkaMessages) {
302-
connectTask.commitRecord(m);
302+
connectTask.commitRecord(m, null);
303303
}
304304

305305
kafkaMessages = connectTask.poll();
306306
assertEquals(10, kafkaMessages.size());
307307
for (final SourceRecord m : kafkaMessages) {
308-
connectTask.commitRecord(m);
308+
connectTask.commitRecord(m, null);
309309
}
310310

311311
kafkaMessages = connectTask.poll();
312312
assertEquals(10, kafkaMessages.size());
313313
for (final SourceRecord m : kafkaMessages) {
314-
connectTask.commitRecord(m);
314+
connectTask.commitRecord(m, null);
315315
}
316316

317317
kafkaMessages = connectTask.poll();
318318
assertEquals(5, kafkaMessages.size());
319319
for (final SourceRecord m : kafkaMessages) {
320-
connectTask.commitRecord(m);
320+
connectTask.commitRecord(m, null);
321321
}
322322
}
323323

@@ -346,7 +346,7 @@ public void verifyMessageIdAsKey() throws Exception {
346346

347347
assertEquals("testmessage", kafkaMessage.value());
348348

349-
connectTask.commitRecord(kafkaMessage);
349+
connectTask.commitRecord(kafkaMessage, null);
350350
}
351351

352352
@Test
@@ -374,13 +374,13 @@ public void verifyCorrelationIdAsKey() throws Exception {
374374
assertEquals("verifycorrel", kafkaMessage1.key());
375375
assertEquals(Schema.OPTIONAL_STRING_SCHEMA, kafkaMessage1.keySchema());
376376
assertEquals("first message", kafkaMessage1.value());
377-
connectTask.commitRecord(kafkaMessage1);
377+
connectTask.commitRecord(kafkaMessage1, null);
378378

379379
final SourceRecord kafkaMessage2 = kafkaMessages.get(1);
380380
assertEquals("5fb4a18030154fe4b09a1dfe8075bc101dfe8075bc104fe4", kafkaMessage2.key());
381381
assertEquals(Schema.OPTIONAL_STRING_SCHEMA, kafkaMessage2.keySchema());
382382
assertEquals("second message", kafkaMessage2.value());
383-
connectTask.commitRecord(kafkaMessage2);
383+
connectTask.commitRecord(kafkaMessage2, null);
384384
}
385385

386386
@Test
@@ -408,7 +408,7 @@ public void verifyCorrelationIdBytesAsKey() throws Exception {
408408

409409
assertEquals("testmessagewithcorrelbytes", kafkaMessage.value());
410410

411-
connectTask.commitRecord(kafkaMessage);
411+
connectTask.commitRecord(kafkaMessage, null);
412412
}
413413

414414
@Test
@@ -435,7 +435,7 @@ public void verifyDestinationAsKey() throws Exception {
435435

436436
assertEquals("testmessagewithdest", kafkaMessage.value());
437437

438-
connectTask.commitRecord(kafkaMessage);
438+
connectTask.commitRecord(kafkaMessage, null);
439439
}
440440

441441
@Test
@@ -467,7 +467,7 @@ public void testSequenceStateMsgReadUnderMQTx() throws Exception {
467467
assertThat(stateMsgs1.size()).isEqualTo(1);
468468

469469
for (final SourceRecord m : kafkaMessages) {
470-
connectTask.commitRecord(m);
470+
connectTask.commitRecord(m, null);
471471
}
472472

473473
/// make commit do rollback when poll is called
@@ -651,7 +651,7 @@ public void verifyEmptyMessage() throws Exception {
651651
final SourceRecord kafkaMessage = kafkaMessages.get(0);
652652
assertNull(kafkaMessage.value());
653653

654-
connectTask.commitRecord(kafkaMessage);
654+
connectTask.commitRecord(kafkaMessage, null);
655655
}
656656

657657
@Test
@@ -674,7 +674,7 @@ public void verifyEmptyTextMessage() throws Exception {
674674
final SourceRecord kafkaMessage = kafkaMessages.get(0);
675675
assertNull(kafkaMessage.value());
676676

677-
connectTask.commitRecord(kafkaMessage);
677+
connectTask.commitRecord(kafkaMessage, null);
678678
}
679679

680680
@Test
@@ -752,7 +752,7 @@ public void verifyErrorToleranceMessages() throws Exception {
752752
final Map<?, ?> value = (Map<?, ?>) validRecord.value();
753753
assertThat(value.get("i")).isEqualTo(Long.valueOf(i));
754754

755-
connectTask.commitRecord(validRecord);
755+
connectTask.commitRecord(validRecord, null);
756756
}
757757
}
758758

@@ -797,7 +797,7 @@ public void shouldRoutePoisonMessagesToDeadLetterQueueWhenErrorToleranceIsAll()
797797
final Map<?, ?> value = (Map<?, ?>) validRecord.value();
798798
assertThat(value.get("i")).isEqualTo(Long.valueOf(i - 1));
799799

800-
connectTask.commitRecord(validRecord);
800+
connectTask.commitRecord(validRecord, null);
801801
}
802802
}
803803

@@ -876,7 +876,7 @@ public void shouldPreserveDlqHeadersWithErrorInformation() throws Exception {
876876
assertEquals(headers.lastWithName("__connect.errors.jms.message.id").value(), message.getJMSMessageID());
877877
assertEquals(headers.lastWithName("__connect.errors.jms.timestamp").value(), message.getJMSTimestamp());
878878
assertEquals(headers.lastWithName("__connect.errors.mq.queue").value(), DEFAULT_SOURCE_QUEUE);
879-
connectTask.commitRecord(dlqRecord);
879+
connectTask.commitRecord(dlqRecord, null);
880880
}
881881

882882
@Test
@@ -910,7 +910,7 @@ public void shouldHandleDifferentMessageTypesToDlq() throws Exception {
910910
for (final SourceRecord dlqRecord : processedRecords) {
911911
assertThat(dlqRecord.topic()).isEqualTo("__dlq.mq.source");
912912
assertThat(dlqRecord.valueSchema().type()).isEqualTo(Schema.Type.BYTES);
913-
connectTask.commitRecord(dlqRecord);
913+
connectTask.commitRecord(dlqRecord, null);
914914
}
915915
}
916916

@@ -943,7 +943,7 @@ public void shouldPreserveJmsPropertiesInDlqMessages() throws Exception {
943943
assertThat(headers.lastWithName("__connect.errors.exception.message").value())
944944
.isEqualTo("Converting byte[] to Kafka Connect data failed due to serialization error: ");
945945

946-
connectTask.commitRecord(dlqRecord);
946+
connectTask.commitRecord(dlqRecord, null);
947947
}
948948

949949
@Test
@@ -984,7 +984,7 @@ public void shouldHandleMixOfValidAndInvalidMessagesWithDifferentFormats() throw
984984
validCount++;
985985
assertThat(record.topic()).isEqualTo("mytopic");
986986
}
987-
connectTask.commitRecord(record);
987+
connectTask.commitRecord(record, null);
988988
}
989989

990990
assertThat(validCount).isEqualTo(3);

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskOnlyOnceIT.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public void testPollGetSequenceId_GivenSequenceStateIsPresentOnQueue() throws Ex
126126
final List<SourceRecord> kafkaMessages = connectTask.poll();
127127

128128
for (final SourceRecord kafkaMessage : kafkaMessages) {
129-
connectTask.commitRecord(kafkaMessage);
129+
connectTask.commitRecord(kafkaMessage, null);
130130
}
131131

132132
assertThat(kafkaMessages)
@@ -145,7 +145,7 @@ public void testPollGetsSequenceId_GivenSequenceStateIsNotPresentOnQueue() throw
145145
final List<SourceRecord> kafkaMessages = connectTask.poll();
146146

147147
for (final SourceRecord kafkaMessage : kafkaMessages) {
148-
connectTask.commitRecord(kafkaMessage);
148+
connectTask.commitRecord(kafkaMessage, null);
149149
}
150150

151151
assertThat(kafkaMessages)
@@ -166,7 +166,7 @@ public void testPollEndsWithPutSequenceStateOnQueue_GivenMessagesHaveBeenReceive
166166
final List<SourceRecord> kafkaMessages = connectTask.poll();
167167

168168
for (final SourceRecord kafkaMessage : kafkaMessages) {
169-
connectTask.commitRecord(kafkaMessage);
169+
connectTask.commitRecord(kafkaMessage, null);
170170
}
171171

172172
assertThat(getSequenceStateAndAssertNotEmpty()).isEqualTo(new SequenceState(
@@ -215,7 +215,7 @@ private void pollAndAssert(int expectedBatchSize, long sequenceId) throws Except
215215
.contains(sequenceId);
216216

217217
for (final SourceRecord m : kafkaMessages) {
218-
connectTask.commitRecord(m);
218+
connectTask.commitRecord(m, null);
219219
}
220220

221221
assertThat(getSequenceStateAndAssertNotEmpty()).isEqualTo(new SequenceState(
@@ -246,7 +246,7 @@ public void testSourceOffset() throws Exception {
246246
final List<SourceRecord> sourceRecords = connectTask.poll();
247247

248248
for (final SourceRecord sourceRecord : sourceRecords) {
249-
connectTask.commitRecord(sourceRecord);
249+
connectTask.commitRecord(sourceRecord, null);
250250
}
251251

252252
assertThat(sourceRecords)
@@ -275,7 +275,7 @@ public void test_IfSequenceStateIsOnStateQueue_ThenActionIsREDELIVER_UNSENT_BATC
275275
final List<SourceRecord> kafkaMessages = connectTask.poll();
276276

277277
for (final SourceRecord kafkaMessage : kafkaMessages) {
278-
connectTask.commitRecord(kafkaMessage);
278+
connectTask.commitRecord(kafkaMessage, null);
279279
}
280280

281281
assertThat(kafkaMessages)
@@ -298,7 +298,7 @@ public void testConnectorFirstTimeRunStart_ActionIsStandardGet_AndStateNotInKafk
298298
final List<SourceRecord> kafkaMessages = connectTask.poll();
299299

300300
for (final SourceRecord kafkaMessage : kafkaMessages) {
301-
connectTask.commitRecord(kafkaMessage);
301+
connectTask.commitRecord(kafkaMessage, null);
302302
}
303303

304304
assertThat(kafkaMessages)
@@ -329,7 +329,7 @@ public void testOnlyOnceDisabled_NoStateSaved_AndSequenceIdIncrements() throws E
329329
kafkaMessages = connectTask.poll();
330330

331331
for (final SourceRecord kafkaMessage : kafkaMessages) {
332-
connectTask.commitRecord(kafkaMessage);
332+
connectTask.commitRecord(kafkaMessage, null);
333333
}
334334

335335
assertThat(kafkaMessages)
@@ -349,7 +349,7 @@ public void testOnlyOnceDisabled_NoStateSaved_AndSequenceIdIncrements() throws E
349349
kafkaMessages = connectTask.poll();
350350

351351
for (final SourceRecord kafkaMessage : kafkaMessages) {
352-
connectTask.commitRecord(kafkaMessage);
352+
connectTask.commitRecord(kafkaMessage, null);
353353
}
354354

355355
assertThat(kafkaMessages)

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskOnlyOnceStartBehaviourIT.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public void testOnlyOnceStartBehaviour_GivenNoSequenceStateIsPresentOnQueueOrKaf
128128
final List<SourceRecord> kafkaMessages = connectTask.poll();
129129

130130
for (final SourceRecord kafkaMessage : kafkaMessages) {
131-
connectTask.commitRecord(kafkaMessage);
131+
connectTask.commitRecord(kafkaMessage, null);
132132
}
133133

134134
// Check that MQ has been read from
@@ -169,7 +169,7 @@ public void testOnlyOnceStartBehaviour_GivenNoSequenceStateIsPresentOnQueueButOf
169169
final List<SourceRecord> kafkaMessages = connectTask.poll();
170170

171171
for (final SourceRecord kafkaMessage : kafkaMessages) {
172-
connectTask.commitRecord(kafkaMessage);
172+
connectTask.commitRecord(kafkaMessage, null);
173173
}
174174

175175
// Check that MQ has been read from
@@ -500,7 +500,7 @@ public void testOnlyOnceStartBehaviour_CrashAfterKafkaCommitBeforeMQCommit() thr
500500
assertThat(kafkaMessagesRoundOne.size()).isEqualTo(2);
501501

502502
for (final SourceRecord kafkaMessage : kafkaMessagesRoundOne) {
503-
connectTask.commitRecord(kafkaMessage);
503+
connectTask.commitRecord(kafkaMessage, null);
504504
}
505505

506506
assertThat(kafkaMessagesRoundOne)
@@ -518,7 +518,7 @@ public void testOnlyOnceStartBehaviour_CrashAfterKafkaCommitBeforeMQCommit() thr
518518
assertThat(kafkaMessagesRoundTwo.size()).isEqualTo(2);
519519

520520
for (final SourceRecord kafkaMessage : kafkaMessagesRoundTwo) {
521-
connectTask.commitRecord(kafkaMessage);
521+
connectTask.commitRecord(kafkaMessage, null);
522522
}
523523

524524
assertThat(kafkaMessagesRoundTwo)
@@ -540,7 +540,7 @@ public void testOnlyOnceStartBehaviour_CrashAfterKafkaCommitBeforeMQCommit() thr
540540
final List<SourceRecord> kafkaMessagesRoundThree = connectTask.poll();
541541

542542
for (final SourceRecord kafkaMessage : kafkaMessagesRoundThree) {
543-
connectTask.commitRecord(kafkaMessage);
543+
connectTask.commitRecord(kafkaMessage, null);
544544
}
545545

546546
// These messages would have been returned if onlyOnce wasn't implemented.

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import com.ibm.eventstreams.connect.mqsource.util.LogMessages;
2323
import com.ibm.eventstreams.connect.mqsource.util.ExceptionProcessor;
2424
import com.ibm.eventstreams.connect.mqsource.util.QueueConfig;
25+
26+
import org.apache.kafka.clients.producer.RecordMetadata;
2527
import org.apache.kafka.common.config.AbstractConfig;
2628
import org.apache.kafka.connect.errors.ConnectException;
2729
import org.apache.kafka.connect.source.SourceRecord;
@@ -533,7 +535,7 @@ public void stop() {
533535
* @throws InterruptedException
534536
*/
535537
@Override
536-
public void commitRecord(final SourceRecord record) throws InterruptedException {
538+
public void commitRecord(final SourceRecord record, final RecordMetadata metadata) throws InterruptedException {
537539
log.trace("[{}] Entry {}.commitRecord, record={}", Thread.currentThread().getId(), this.getClass().getName(),
538540
record);
539541

0 commit comments

Comments
 (0)