diff --git a/build.gradle b/build.gradle index 95fde42..c7d3e02 100644 --- a/build.gradle +++ b/build.gradle @@ -23,7 +23,7 @@ buildscript { classpath 'org.kt3k.gradle.plugin:coveralls-gradle-plugin:1.0.2' classpath 'me.champeau.gradle:jmh-gradle-plugin:0.4.8' classpath "gradle.plugin.nl.javadude.gradle.plugins:license-gradle-plugin:0.14.0" - classpath 'org.jfrog.buildinfo:build-info-extractor-gradle:latest.release' + classpath 'org.jfrog.buildinfo:build-info-extractor-gradle:5.2.5' classpath 'com.github.jengelman.gradle.plugins:shadow:5.2.0' classpath "org.shipkit:shipkit-auto-version:latest.release" classpath "org.shipkit:shipkit-changelog:latest.release" diff --git a/cdi-core/src/main/java/com/linkedin/cdi/configuration/PropertyCollection.java b/cdi-core/src/main/java/com/linkedin/cdi/configuration/PropertyCollection.java index e03f7ee..8d73b9b 100644 --- a/cdi-core/src/main/java/com/linkedin/cdi/configuration/PropertyCollection.java +++ b/cdi-core/src/main/java/com/linkedin/cdi/configuration/PropertyCollection.java @@ -102,6 +102,9 @@ public Long getMillis(State state) { } }; + BooleanProperties MSTAGE_HDFS_READER_PARSE_JSON_STRINGS = + new BooleanProperties("ms.hdfs.reader.parse.json.strings", Boolean.FALSE); + // ms.http.maxConnections has default value 50 and max value 500 // 0 is interpreted as default IntegerProperties MSTAGE_HTTP_CONN_MAX = @@ -339,6 +342,7 @@ protected String getValidNonblankWithDefault(State state) { MSTAGE_EXTRACT_PREPROCESSORS, MSTAGE_EXTRACT_PREPROCESSORS_PARAMETERS, MSTAGE_GRACE_PERIOD_DAYS, + MSTAGE_HDFS_READER_PARSE_JSON_STRINGS, MSTAGE_HTTP_CONN_MAX, MSTAGE_HTTP_CONN_PER_ROUTE_MAX, MSTAGE_HTTP_CONN_TTL_SECONDS, diff --git a/cdi-core/src/main/java/com/linkedin/cdi/util/HdfsReader.java b/cdi-core/src/main/java/com/linkedin/cdi/util/HdfsReader.java index 5e153f7..2851ba4 100644 --- a/cdi-core/src/main/java/com/linkedin/cdi/util/HdfsReader.java +++ b/cdi-core/src/main/java/com/linkedin/cdi/util/HdfsReader.java @@ -10,6 +10,7 @@ import com.google.gson.JsonElement; import com.google.gson.JsonNull; import com.google.gson.JsonObject; +import com.google.gson.JsonSyntaxException; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -29,6 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.linkedin.cdi.configuration.PropertyCollection.*; import static com.linkedin.cdi.configuration.StaticConstants.*; @@ -206,13 +208,23 @@ private JsonArray readFileAsJsonArray( @VisibleForTesting private JsonObject selectFieldsFromGenericRecord(GenericRecord record, List fields) { JsonObject jsonObject = new JsonObject(); + boolean inlineJsonStrings = MSTAGE_HDFS_READER_PARSE_JSON_STRINGS.get(state); for (String field: fields) { Object valueObject = record.get(field); Schema.Type fieldType = record.getSchema().getField(field).schema().getType(); if (valueObject == null || fieldType == Schema.Type.NULL) { jsonObject.add(field, JsonNull.INSTANCE); } else if (fieldType == Schema.Type.STRING || fieldType == Schema.Type.UNION) { - jsonObject.addProperty(field, EncryptionUtils.decryptGobblin(valueObject.toString(), state)); + String decrypted = EncryptionUtils.decryptGobblin(valueObject.toString(), state); + if (inlineJsonStrings && isValidJson(decrypted)) { + try { + jsonObject.add(field, gson.fromJson(decrypted, JsonElement.class)); + } catch (JsonSyntaxException e) { + jsonObject.addProperty(field, decrypted); + } + } else { + jsonObject.addProperty(field, decrypted); + } } else if (fieldType == Schema.Type.ARRAY) { jsonObject.add(field, gson.fromJson(valueObject.toString(), JsonArray.class)); } else if (fieldType == Schema.Type.RECORD) { @@ -230,6 +242,16 @@ private JsonObject selectFieldsFromGenericRecord(GenericRecord record, List= 2 + && ((trimmed.charAt(0) == '{' && trimmed.charAt(trimmed.length() - 1) == '}') + || (trimmed.charAt(0) == '[' && trimmed.charAt(trimmed.length() - 1) == ']')); + } + /** * retrieve the filters from the secondary input definition * diff --git a/cdi-core/src/test/java/com/linkedin/cdi/util/HdfsReaderTest.java b/cdi-core/src/test/java/com/linkedin/cdi/util/HdfsReaderTest.java new file mode 100644 index 0000000..c2ac82c --- /dev/null +++ b/cdi-core/src/test/java/com/linkedin/cdi/util/HdfsReaderTest.java @@ -0,0 +1,154 @@ +// Copyright 2021 LinkedIn Corporation. All rights reserved. +// Licensed under the BSD-2 Clause license. +// See LICENSE in the project root for license information. + +package com.linkedin.cdi.util; + +import com.google.gson.JsonObject; +import java.util.Collections; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.gobblin.configuration.SourceState; +import org.powermock.reflect.Whitebox; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static com.linkedin.cdi.configuration.PropertyCollection.*; + + +public class HdfsReaderTest { + private static final String FIELD = "data"; + private static final List FIELDS = Collections.singletonList(FIELD); + + private SourceState state; + private HdfsReader reader; + + @BeforeMethod + public void setUp() { + state = new SourceState(); + reader = new HdfsReader(state); + } + + @Test + public void testStringField_whenFlagOn_andValueIsJsonObject_inlinesAsJsonObject() throws Exception { + state.setProp(MSTAGE_HDFS_READER_PARSE_JSON_STRINGS.getConfig(), "true"); + GenericRecord record = recordWithStringField(FIELD, "{\"key\":\"value\"}"); + + JsonObject result = Whitebox.invokeMethod(reader, "selectFieldsFromGenericRecord", record, FIELDS); + + Assert.assertTrue(result.get(FIELD).isJsonObject()); + Assert.assertEquals(result.get(FIELD).getAsJsonObject().get("key").getAsString(), "value"); + } + + @Test + public void testStringField_whenFlagOn_andValueIsJsonArray_inlinesAsJsonArray() throws Exception { + state.setProp(MSTAGE_HDFS_READER_PARSE_JSON_STRINGS.getConfig(), "true"); + GenericRecord record = recordWithStringField(FIELD, "[{\"@context\":\"schema.org\"}]"); + + JsonObject result = Whitebox.invokeMethod(reader, "selectFieldsFromGenericRecord", record, FIELDS); + + Assert.assertTrue(result.get(FIELD).isJsonArray()); + Assert.assertEquals( + result.get(FIELD).getAsJsonArray().get(0).getAsJsonObject().get("@context").getAsString(), + "schema.org"); + } + + @Test + public void testStringField_whenFlagOn_andValueIsPlainText_keepsAsJsonPrimitive() throws Exception { + state.setProp(MSTAGE_HDFS_READER_PARSE_JSON_STRINGS.getConfig(), "true"); + GenericRecord record = recordWithStringField(FIELD, "hello world"); + + JsonObject result = Whitebox.invokeMethod(reader, "selectFieldsFromGenericRecord", record, FIELDS); + + Assert.assertTrue(result.get(FIELD).isJsonPrimitive()); + Assert.assertEquals(result.get(FIELD).getAsString(), "hello world"); + } + + @Test + public void testStringField_whenFlagOn_andValueIsMalformedJson_fallsBackToJsonPrimitive() throws Exception { + state.setProp(MSTAGE_HDFS_READER_PARSE_JSON_STRINGS.getConfig(), "true"); + GenericRecord record = recordWithStringField(FIELD, "[{incomplete]"); + + JsonObject result = Whitebox.invokeMethod(reader, "selectFieldsFromGenericRecord", record, FIELDS); + + Assert.assertTrue(result.get(FIELD).isJsonPrimitive()); + Assert.assertEquals(result.get(FIELD).getAsString(), "[{incomplete]"); + } + + @Test + public void testStringField_whenFlagOff_preservesLegacyBehaviorForJsonContent() throws Exception { + GenericRecord record = recordWithStringField(FIELD, "{\"key\":\"value\"}"); + + JsonObject result = Whitebox.invokeMethod(reader, "selectFieldsFromGenericRecord", record, FIELDS); + + Assert.assertTrue(result.get(FIELD).isJsonPrimitive()); + Assert.assertEquals(result.get(FIELD).getAsString(), "{\"key\":\"value\"}"); + } + + @Test + public void testStringField_whenFlagOff_preservesLegacyBehaviorForPlainText() throws Exception { + GenericRecord record = recordWithStringField(FIELD, "hello"); + + JsonObject result = Whitebox.invokeMethod(reader, "selectFieldsFromGenericRecord", record, FIELDS); + + Assert.assertEquals(result.get(FIELD).getAsString(), "hello"); + } + + @Test + public void testStringField_whenFlagOn_andValueIsNull_returnsJsonNull() throws Exception { + state.setProp(MSTAGE_HDFS_READER_PARSE_JSON_STRINGS.getConfig(), "true"); + GenericRecord record = recordWithStringField(FIELD, null); + + JsonObject result = Whitebox.invokeMethod(reader, "selectFieldsFromGenericRecord", record, FIELDS); + + Assert.assertTrue(result.get(FIELD).isJsonNull()); + } + + @Test + public void testStringField_whenFlagOn_andValueIsEmptyString_keepsAsJsonPrimitive() throws Exception { + state.setProp(MSTAGE_HDFS_READER_PARSE_JSON_STRINGS.getConfig(), "true"); + GenericRecord record = recordWithStringField(FIELD, ""); + + JsonObject result = Whitebox.invokeMethod(reader, "selectFieldsFromGenericRecord", record, FIELDS); + + Assert.assertTrue(result.get(FIELD).isJsonPrimitive()); + Assert.assertEquals(result.get(FIELD).getAsString(), ""); + } + + @Test + public void testNullableStringField_whenFlagOn_andValueIsJsonArray_inlinesAsJsonArray() throws Exception { + state.setProp(MSTAGE_HDFS_READER_PARSE_JSON_STRINGS.getConfig(), "true"); + GenericRecord record = recordWithNullableStringField(FIELD, "[{\"@context\":\"schema.org\"}]"); + + JsonObject result = Whitebox.invokeMethod(reader, "selectFieldsFromGenericRecord", record, FIELDS); + + Assert.assertTrue(result.get(FIELD).isJsonArray()); + Assert.assertEquals( + result.get(FIELD).getAsJsonArray().get(0).getAsJsonObject().get("@context").getAsString(), + "schema.org"); + } + + private GenericRecord recordWithStringField(String key, String val) { + Schema schema = SchemaBuilder.record("Test").namespace("com.linkedin.test") + .doc("Test record").fields() + .name(key).doc("test").type().stringType() + .noDefault().endRecord(); + GenericRecord record = new GenericData.Record(schema); + record.put(key, val); + return record; + } + + private GenericRecord recordWithNullableStringField(String key, String val) { + Schema schema = SchemaBuilder.record("Test").namespace("com.linkedin.test") + .doc("Test record").fields() + .name(key).doc("test").type().nullable().stringType() + .noDefault().endRecord(); + GenericRecord record = new GenericData.Record(schema); + record.put(key, val); + return record; + } +} diff --git a/docs/parameters/ms.hdfs.reader.parse.json.strings.md b/docs/parameters/ms.hdfs.reader.parse.json.strings.md new file mode 100644 index 0000000..2fec001 --- /dev/null +++ b/docs/parameters/ms.hdfs.reader.parse.json.strings.md @@ -0,0 +1,37 @@ +# ms.hdfs.reader.parse.json.strings + +**Tags**: + +**Type**: boolean + +**Format**: true/false + +**Default value**: false + +## Related +- [ms.secondary.input](ms.secondary.input.md) + +## Description + +When set to `true`, `HdfsReader` detects string-typed fields whose values parse as valid JSON (objects or +arrays) and inlines them as JSON elements in the outgoing payload, rather than escaping them as JSON string +primitives. + +This is useful when a pre-serialized JSON document (including JSON-LD, which uses `@`-prefixed field names +that Avro schemas cannot express) is stored as an Avro `string` field and needs to be sent inline in an HTTP +POST body. + +### Behavior + +- `false` (default): string fields are always serialized as JSON string primitives. This is the existing + behavior and is preserved for backward compatibility. +- `true`: for each string field, if the trimmed value begins with `{`/`[` and ends with `}`/`]` and parses + as valid JSON, the parsed JSON element replaces the string primitive. Parse failures fall back to the + default string-primitive behavior. + +### Example + +Given an Avro record with `data: string` containing `[{"@context":"https://schema.org"}]`: + +- Flag off: `{"data":"[{\"@context\":\"https://schema.org\"}]"}` (escaped) +- Flag on: `{"data":[{"@context":"https://schema.org"}]}` (inlined)