Skip to content

Commit f0fabac

Browse files
authored
feat: support for schemas in JSON record builder (#174)
This pull request introduces support for emitting structured records from the JSON record builder. This will allow the MQ Source Connector to read JSON string messages from MQ, and produce them to Kafka using any standard Converter (e.g. to produce them in Avro or Protobuf formats if desired). I've chosen to support Kafka Connect's JSON schema support, rather than the different (and more widely understood) JSON schema. While supporting "standard" JSON schema would have simplified the user config in some respects, this would have left the MQ Connector with responsibility of performing the (ambiguous) conversion from the user-provided JSON schema to the schema used in Connect. As there is not a 1:1 mapping between these two schema types, I think it would be difficult to do such a conversion in a way that always meets user expectations. Instead, by making the user provide a Connect JSON schema, I'm proposing forcing the user to manually convert any json schema they may already have into a Connect schema - forcing them to make the appropriate choices in mapping between the two type systems. This was a difficult trade-off to make, as I'm favouring unambiguity of config over ease of config (if we assume that more users are comfortable writing "standard" JSON schemas than Connect JSON schemas). To try and catch confusions in this, I've included validation to ensure that we reject non-Connect schemas. The JsonConverter dependency used in JSON record builder has support for this from Kafka Connect v4.2, so the simplest implementation would be to update the dependency in pom.xml to version 4.2, and just pass through the schemas.enable and schema.content configuration properties to the converter and leave the Converter to do everything. This felt like an overly aggressive dependency jump, so in the interest of continuing to support Connect 3.x versions, I've implemented a fall-back implementation that reuses the schema "envelope" approach present in JsonConverter 3.x The additional string operations this will incur for every message will almost certainly impact performance, so I see this as a temporary workaround that we should remove as soon as we feel that Connect 4.x adoption is sufficient. Signed-off-by: Dale Lane <dale.lane@uk.ibm.com>
1 parent 034f16c commit f0fabac

File tree

11 files changed

+454
-20
lines changed

11 files changed

+454
-20
lines changed

.github/ISSUE_TEMPLATE/BUG-REPORT.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ body:
5858
description: What version of our software are you running?
5959
options:
6060
- latest
61-
- 2.6.0 (Default)
61+
- 2.7.0 (Default)
6262
- 1.3.5
6363
- older (<1.3.5)
6464
validations:

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,12 +306,14 @@ The configuration options for the Kafka Connect source connector for IBM MQ are
306306
| `mq.max.poll.blocked.time.ms` | How long the connector will wait for the previous batch of messages to be delivered to Kafka before starting a new poll | integer | 2000 | It is important that this is less than the time defined for `task.shutdown.graceful.timeout.ms` as that is how long connect will wait for the task to perform lifecycle operations. |
307307
| `mq.client.reconnect.options` | Options governing MQ reconnection. | string | ASDEF | ASDEF, ANY, QMGR, DISABLED |
308308
| `mq.message.receive.timeout` | The timeout (in milliseconds) for receiving messages from the queue manager before returning to Kafka Connect. | long | 2000 | 1 or greater |
309-
| `mq.receive.subsequent.timeout.ms` | The timeout (in milliseconds) for receiving messages from the queue manager on the subsequent receives before returning to Kafka Connect. | long | 0 | 1 or greater |
309+
| `mq.receive.subsequent.timeout.ms` | The timeout (in milliseconds) for receiving messages from the queue manager on the subsequent receives before returning to Kafka Connect. | long | 0 | 1 or greater |
310310
| `mq.reconnect.delay.min.ms` | The minimum delay (in milliseconds) for reconnecting to the queue manager after a connection error. | long | 64 | 1 or greater |
311311
| `mq.reconnect.delay.max.ms` | The maximum delay (in milliseconds) for reconnecting to the queue manager after a connection error. | long | 8192 | 1 or greater |
312312
| `mq.receive.max.poll.time.ms` | Maximum time (in milliseconds) to poll messages in a single Kafka Connect task cycle. If set to 0, polling continues until batch size or a receive returns null. | long | 0 | 0 or greater |
313313
| `errors.deadletterqueue.topic.name` | The name of the Kafka topic to use as the dead letter queue (DLQ) for poison messages that fail during processing within the record builder component of the connector. | string | | If left blank (default), failed messages will not be written to a DLQ. |
314314
| `errors.deadletterqueue.context.headers.enable` | Whether to add error context headers to messages written to the DLQ. | boolean | false | When enabled, additional headers describing the error will be included with each DLQ record. |
315+
| `mq.record.builder.json.schemas.enable` | (JSON record builder only) Include schemas within Kafka messages. This is used as the `schemas.enable` config for the JsonConverter used by the JSON record builder. If true, a schema must be provided - either using `mq.record.builder.json.schema.content` or by embedding a schema within each MQ message payload. | boolean | false | |
316+
| `mq.record.builder.json.schema.content` | (JSON record builder only) Schema to use for all messages. This is used as the `schema.content` config for the JsonConverter used by the JSON record builder. If provided, this will be used in preference to any schema embedded in MQ messages. | string | | This should be a Kafka Connect schema, as used by JsonConverter. |
315317

316318
### Using a CCDT file
317319

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
<groupId>com.ibm.eventstreams.connect</groupId>
2121
<artifactId>kafka-connect-mq-source</artifactId>
2222
<packaging>jar</packaging>
23-
<version>2.6.0</version>
23+
<version>2.7.0</version>
2424
<name>kafka-connect-mq-source</name>
2525
<organization>
2626
<name>IBM Corporation</name>

src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.github.dockerjava.api.model.HostConfig;
2020
import com.github.dockerjava.api.model.PortBinding;
2121
import com.github.dockerjava.api.model.Ports;
22+
import com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder;
2223
import com.ibm.eventstreams.connect.mqsource.utils.MQTestUtil;
2324
import com.ibm.mq.jms.MQConnectionFactory;
2425
import com.ibm.msg.client.wmq.WMQConstants;
@@ -102,6 +103,7 @@ public static Map<String, String> getDefaultConnectorProperties() {
102103
props.put("mq.channel.name", CHANNEL_NAME);
103104
props.put("mq.queue", DEFAULT_SOURCE_QUEUE);
104105
props.put("mq.user.authentication.mqcsp", "false");
106+
props.put("mq.record.builder", DefaultRecordBuilder.class.getCanonicalName());
105107
props.put("topic", "mytopic");
106108
return props;
107109
}

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import javax.jms.TextMessage;
5151

5252
import org.apache.kafka.connect.data.Schema;
53+
import org.apache.kafka.connect.data.Struct;
5354
import org.apache.kafka.connect.errors.ConnectException;
5455
import org.apache.kafka.connect.header.Headers;
5556
import org.apache.kafka.connect.runtime.ConnectorConfig;
@@ -167,6 +168,189 @@ public void verifyJmsJsonMessages() throws Exception {
167168
}
168169
}
169170

171+
// verify that user can use the standard approach for the JsonConverter
172+
// of embedding schemas in message payloads (enabling this using a
173+
// record builder config option)
174+
@Test
175+
public void verifyJmsSchemaMessages() throws Exception {
176+
connectTask = getSourceTaskWithEmptyKafkaOffset();
177+
178+
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
179+
connectorConfigProps.put("mq.message.body.jms", "true");
180+
connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
181+
connectorConfigProps.put("mq.record.builder.json.schemas.enable", "true");
182+
183+
connectTask.start(connectorConfigProps);
184+
185+
final List<Message> messages = new ArrayList<>();
186+
for (int i = 0; i < 5; i++) {
187+
messages.add(getJmsContext().createTextMessage(
188+
"{\n" +
189+
"\"schema\": {\n" +
190+
" \"type\": \"struct\", \n" +
191+
" \"fields\": [\n" +
192+
" {\n" +
193+
" \"field\": \"idx\", \n" +
194+
" \"type\": \"int64\"\n" +
195+
" },\n" +
196+
" {\n" +
197+
" \"field\": \"test\", \n" +
198+
" \"type\": \"string\"\n" +
199+
" }" +
200+
" ]\n" +
201+
"}, " +
202+
"\"payload\": { " +
203+
" \"idx\": " + i + ", " +
204+
" \"test\" : \"abcdef\" " +
205+
"}" +
206+
"}"));
207+
}
208+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);
209+
210+
final List<SourceRecord> kafkaMessages = connectTask.poll();
211+
assertEquals(5, kafkaMessages.size());
212+
213+
for (int i = 0; i < 5; i++) {
214+
final SourceRecord kafkaMessage = kafkaMessages.get(i);
215+
assertNull(kafkaMessage.key());
216+
217+
assertNotNull(kafkaMessage.valueSchema());
218+
assertEquals(Schema.INT64_SCHEMA, kafkaMessage.valueSchema().field("idx").schema());
219+
assertEquals(Schema.STRING_SCHEMA, kafkaMessage.valueSchema().field("test").schema());
220+
221+
final Struct value = (Struct) kafkaMessage.value();
222+
assertEquals(Long.valueOf(i), value.getInt64("idx"));
223+
assertEquals("abcdef", value.getString("test"));
224+
225+
connectTask.commitRecord(kafkaMessage, null);
226+
}
227+
}
228+
229+
// verify that a reusable schema can be provided to the JSON record builder
230+
// as part of the connector config, so that this can be reused across
231+
// multiple MQ messages
232+
@Test
233+
public void verifyJmsReusableSchemaMessages() throws Exception {
234+
connectTask = getSourceTaskWithEmptyKafkaOffset();
235+
236+
final String SCHEMA = "{\n" +
237+
" \"type\": \"struct\", \n" +
238+
" \"fields\": [\n" +
239+
" {\n" +
240+
" \"field\": \"idx\", \n" +
241+
" \"type\": \"int32\"\n" +
242+
" },\n" +
243+
" {\n" +
244+
" \"field\": \"a\", \n" +
245+
" \"type\": \"string\"\n" +
246+
" },\n" +
247+
" {\n" +
248+
" \"field\": \"b\", \n" +
249+
" \"type\": \"int64\"\n" +
250+
" },\n" +
251+
" {\n" +
252+
" \"field\": \"c\", \n" +
253+
" \"type\": \"double\"\n" +
254+
" },\n" +
255+
" {\n" +
256+
" \"field\": \"d\", \n" +
257+
" \"type\": \"boolean\"\n" +
258+
" },\n" +
259+
" {\n" +
260+
" \"field\": \"e\", \n" +
261+
" \"type\": \"float\"\n" +
262+
" },\n" +
263+
" {\n" +
264+
" \"field\": \"f\", \n" +
265+
" \"type\": \"array\",\n" +
266+
" \"items\": {\n" +
267+
" \"type\": \"string\"\n" +
268+
" }\n" +
269+
" },\n" +
270+
" {\n" +
271+
" \"field\": \"g\", \n" +
272+
" \"type\": \"array\", \n" +
273+
" \"items\": {\n" +
274+
" \"type\": \"int32\"\n" +
275+
" }\n" +
276+
" },\n" +
277+
" {\n" +
278+
" \"field\": \"h\", \n" +
279+
" \"type\": \"struct\", \n" +
280+
" \"fields\": [\n" +
281+
" {\n" +
282+
" \"field\": \"innerstr\", \n" +
283+
" \"type\": \"string\"\n" +
284+
" },\n" +
285+
" {\n" +
286+
" \"field\": \"innernum\", \n" +
287+
" \"type\": \"int64\"\n" +
288+
" }\n" +
289+
" ]\n" +
290+
" }\n" +
291+
" ]\n" +
292+
"}";
293+
294+
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
295+
connectorConfigProps.put("mq.message.body.jms", "true");
296+
connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
297+
connectorConfigProps.put("mq.record.builder.json.schemas.enable", "true");
298+
connectorConfigProps.put("mq.record.builder.json.schema.content", SCHEMA);
299+
300+
connectTask.start(connectorConfigProps);
301+
302+
final List<Message> messages = new ArrayList<>();
303+
for (int i = 0; i < 5; i++) {
304+
messages.add(getJmsContext().createTextMessage(
305+
"{ " +
306+
"\"idx\": " + i + ", \n" +
307+
"\"a\" : \"test\", \n" +
308+
"\"b\" : 1234, \n" +
309+
"\"c\" : 5.67, \n" +
310+
"\"d\" : false, \n" +
311+
"\"e\" : 12.34, \n" +
312+
"\"f\" : [ \"a\", \"b\", \"c\" ], \n" +
313+
"\"g\" : [ 1, 2, 3 ], \n" +
314+
"\"h\" : { \"innerstr\" : \"testing\", \"innernum\" : 89 }" +
315+
"}"));
316+
}
317+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);
318+
319+
final List<SourceRecord> kafkaMessages = connectTask.poll();
320+
assertEquals(5, kafkaMessages.size());
321+
322+
for (int i = 0; i < 5; i++) {
323+
final SourceRecord kafkaMessage = kafkaMessages.get(i);
324+
assertNull(kafkaMessage.key());
325+
326+
assertNotNull(kafkaMessage.valueSchema());
327+
assertEquals(Schema.INT32_SCHEMA, kafkaMessage.valueSchema().field("idx").schema());
328+
assertEquals(Schema.STRING_SCHEMA, kafkaMessage.valueSchema().field("a").schema());
329+
assertEquals(Schema.INT64_SCHEMA, kafkaMessage.valueSchema().field("b").schema());
330+
assertEquals(Schema.FLOAT64_SCHEMA, kafkaMessage.valueSchema().field("c").schema());
331+
assertEquals(Schema.BOOLEAN_SCHEMA, kafkaMessage.valueSchema().field("d").schema());
332+
assertEquals(Schema.FLOAT32_SCHEMA, kafkaMessage.valueSchema().field("e").schema());
333+
assertEquals(Schema.STRING_SCHEMA, kafkaMessage.valueSchema().field("f").schema().valueSchema());
334+
assertEquals(Schema.INT32_SCHEMA, kafkaMessage.valueSchema().field("g").schema().valueSchema());
335+
assertEquals(Schema.STRING_SCHEMA, kafkaMessage.valueSchema().field("h").schema().field("innerstr").schema());
336+
assertEquals(Schema.INT64_SCHEMA, kafkaMessage.valueSchema().field("h").schema().field("innernum").schema());
337+
338+
final Struct value = (Struct) kafkaMessage.value();
339+
assertEquals(Integer.valueOf(i), value.getInt32("idx"));
340+
assertEquals("test", value.getString("a"));
341+
assertEquals(Long.valueOf(1234), value.getInt64("b"));
342+
assertEquals(Double.valueOf(5.67), value.getFloat64("c"));
343+
assertEquals(false, value.getBoolean("d"));
344+
assertEquals(Float.valueOf(12.34f), value.getFloat32("e"));
345+
assertArrayEquals(new String[]{ "a", "b", "c"}, value.getArray("f").toArray(new String[]{}));
346+
assertArrayEquals(new Integer[] { 1, 2, 3 }, value.getArray("g").toArray(new Integer[]{}));
347+
assertEquals("testing", value.getStruct("h").getString("innerstr"));
348+
assertEquals(Long.valueOf(89), value.getStruct("h").getInt64("innernum"));
349+
350+
connectTask.commitRecord(kafkaMessage, null);
351+
}
352+
}
353+
170354
@Test
171355
public void verifyMQMessage() throws Exception {
172356
connectTask = getSourceTaskWithEmptyKafkaOffset();

src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public void buildFromJmsTextMessage() throws Exception {
6565

6666
// use the builder to convert it to a Kafka record
6767
final JsonRecordBuilder builder = new JsonRecordBuilder();
68+
builder.configure(getDefaultConnectorProperties());
6869
final SourceRecord record = builder.toSourceRecord(getJmsContext(), topic, isJMS, message);
6970

7071
// verify the Kafka record
@@ -82,6 +83,7 @@ public void buildFromJmsBytesMessage() throws Exception {
8283

8384
// use the builder to convert it to a Kafka record
8485
final JsonRecordBuilder builder = new JsonRecordBuilder();
86+
builder.configure(getDefaultConnectorProperties());
8587
final SourceRecord record = builder.toSourceRecord(getJmsContext(), topic, isJMS, message);
8688

8789
// verify the Kafka record
@@ -100,6 +102,7 @@ public void buildFromJmsMapMessage() throws Exception {
100102

101103
// use the builder to convert it to a Kafka record
102104
final JsonRecordBuilder builder = new JsonRecordBuilder();
105+
builder.configure(getDefaultConnectorProperties());
103106
final RecordBuilderException exc = assertThrows(RecordBuilderException.class, () -> {
104107
builder.toSourceRecord(getJmsContext(), topic, isJMS, message);
105108
});
@@ -115,6 +118,7 @@ public void buildFromJmsTestJsonError() throws Exception {
115118

116119
// use the builder to convert it to a Kafka record
117120
final JsonRecordBuilder builder = new JsonRecordBuilder();
121+
builder.configure(getDefaultConnectorProperties());
118122
final DataException exec = assertThrows(DataException.class, () -> builder.toSourceRecord(getJmsContext(), topic, isJMS, message));
119123
assertEquals("Converting byte[] to Kafka Connect data failed due to serialization error: ", exec.getMessage());
120124
}
@@ -143,7 +147,7 @@ public void buildFromJmsTestErrorToleranceNone() throws Exception {
143147

144148
// use the builder to convert it to a Kafka record
145149
final JsonRecordBuilder builder = new JsonRecordBuilder();
146-
final HashMap<String, String> config = new HashMap<String, String>();
150+
final Map<String, String> config = getDefaultConnectorProperties();
147151
config.put("errors.tolerance", "none");
148152
config.put("mq.message.body.jms", "true");
149153
config.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
@@ -187,7 +191,7 @@ public void testToSourceRecord_JsonRecordBuilder_JsonMessage() throws Exception
187191
assertThat(sourceRecord).isNotNull();
188192
assertThat(sourceRecord.value()).isInstanceOf(Map.class);
189193
assertNull(sourceRecord.valueSchema()); // JSON with no schema
190-
194+
191195
// Verify JSON data
192196
@SuppressWarnings("unchecked")
193197
Map<String, Object> value = (Map<String, Object>) sourceRecord.value();

0 commit comments

Comments
 (0)