From 70228044689c7ea60d6575b750909db56328c2b7 Mon Sep 17 00:00:00 2001 From: Vignesh Rao Date: Wed, 22 Apr 2026 07:38:14 -0700 Subject: [PATCH 1/6] Add ms.hdfs.reader.parse.json.strings for inlining JSON strings HdfsReader.selectFieldsFromGenericRecord serializes Avro string fields as JSON string primitives, which double-encodes pre-serialized JSON payloads (e.g. JSON-LD whose @-prefixed names are not valid Avro identifiers). When the new property is true, string values that parse as JSON objects or arrays are inlined via JsonParser.parseString; parse failures fall back to the existing primitive behavior. Default is false, so existing jobs are unaffected. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../cdi/configuration/PropertyCollection.java | 3 + .../com/linkedin/cdi/util/HdfsReader.java | 25 +++- .../com/linkedin/cdi/util/HdfsReaderTest.java | 131 ++++++++++++++++++ .../ms.hdfs.reader.parse.json.strings.md | 37 +++++ 4 files changed, 195 insertions(+), 1 deletion(-) create mode 100644 cdi-core/src/test/java/com/linkedin/cdi/util/HdfsReaderTest.java create mode 100644 docs/parameters/ms.hdfs.reader.parse.json.strings.md diff --git a/cdi-core/src/main/java/com/linkedin/cdi/configuration/PropertyCollection.java b/cdi-core/src/main/java/com/linkedin/cdi/configuration/PropertyCollection.java index e03f7ee..aa7cee0 100644 --- a/cdi-core/src/main/java/com/linkedin/cdi/configuration/PropertyCollection.java +++ b/cdi-core/src/main/java/com/linkedin/cdi/configuration/PropertyCollection.java @@ -102,6 +102,9 @@ public Long getMillis(State state) { } }; + BooleanProperties MSTAGE_HDFS_READER_PARSE_JSON_STRINGS = + new BooleanProperties("ms.hdfs.reader.parse.json.strings", Boolean.FALSE); + // ms.http.maxConnections has default value 50 and max value 500 // 0 is interpreted as default IntegerProperties MSTAGE_HTTP_CONN_MAX = diff --git a/cdi-core/src/main/java/com/linkedin/cdi/util/HdfsReader.java b/cdi-core/src/main/java/com/linkedin/cdi/util/HdfsReader.java index 5e153f7..dab97ca 100644 --- a/cdi-core/src/main/java/com/linkedin/cdi/util/HdfsReader.java +++ b/cdi-core/src/main/java/com/linkedin/cdi/util/HdfsReader.java @@ -10,6 +10,8 @@ import com.google.gson.JsonElement; import com.google.gson.JsonNull; import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.JsonSyntaxException; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -29,6 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.linkedin.cdi.configuration.PropertyCollection.*; import static com.linkedin.cdi.configuration.StaticConstants.*; @@ -206,13 +209,23 @@ private JsonArray readFileAsJsonArray( @VisibleForTesting private JsonObject selectFieldsFromGenericRecord(GenericRecord record, List fields) { JsonObject jsonObject = new JsonObject(); + boolean inlineJsonStrings = MSTAGE_HDFS_READER_PARSE_JSON_STRINGS.get(state); for (String field: fields) { Object valueObject = record.get(field); Schema.Type fieldType = record.getSchema().getField(field).schema().getType(); if (valueObject == null || fieldType == Schema.Type.NULL) { jsonObject.add(field, JsonNull.INSTANCE); } else if (fieldType == Schema.Type.STRING || fieldType == Schema.Type.UNION) { - jsonObject.addProperty(field, EncryptionUtils.decryptGobblin(valueObject.toString(), state)); + String decrypted = EncryptionUtils.decryptGobblin(valueObject.toString(), state); + if (inlineJsonStrings && looksLikeJson(decrypted)) { + try { + jsonObject.add(field, JsonParser.parseString(decrypted)); + } catch (JsonSyntaxException e) { + jsonObject.addProperty(field, decrypted); + } + } else { + jsonObject.addProperty(field, decrypted); + } } else if (fieldType == Schema.Type.ARRAY) { jsonObject.add(field, gson.fromJson(valueObject.toString(), JsonArray.class)); } else if (fieldType == Schema.Type.RECORD) { @@ -230,6 +243,16 @@ private JsonObject selectFieldsFromGenericRecord(GenericRecord record, List= 2 + && ((t.charAt(0) == '{' && t.charAt(t.length() - 1) == '}') + || (t.charAt(0) == '[' && t.charAt(t.length() - 1) == ']')); + } + /** * retrieve the filters from the secondary input definition * diff --git a/cdi-core/src/test/java/com/linkedin/cdi/util/HdfsReaderTest.java b/cdi-core/src/test/java/com/linkedin/cdi/util/HdfsReaderTest.java new file mode 100644 index 0000000..9a49913 --- /dev/null +++ b/cdi-core/src/test/java/com/linkedin/cdi/util/HdfsReaderTest.java @@ -0,0 +1,131 @@ +// Copyright 2021 LinkedIn Corporation. All rights reserved. +// Licensed under the BSD-2 Clause license. +// See LICENSE in the project root for license information. + +package com.linkedin.cdi.util; + +import com.google.gson.JsonObject; +import java.util.Collections; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.gobblin.configuration.SourceState; +import org.powermock.reflect.Whitebox; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static com.linkedin.cdi.configuration.PropertyCollection.*; + + +public class HdfsReaderTest { + private static final String FIELD = "data"; + private static final List FIELDS = Collections.singletonList(FIELD); + + private SourceState state; + private HdfsReader reader; + + @BeforeMethod + public void setUp() { + state = new SourceState(); + reader = new HdfsReader(state); + } + + @Test + public void testStringField_whenFlagOn_andValueIsJsonObject_inlinesAsJsonObject() throws Exception { + state.setProp(MSTAGE_HDFS_READER_PARSE_JSON_STRINGS.getConfig(), "true"); + GenericRecord record = recordWithStringField(FIELD, "{\"key\":\"value\"}"); + + JsonObject result = Whitebox.invokeMethod(reader, "selectFieldsFromGenericRecord", record, FIELDS); + + Assert.assertTrue(result.get(FIELD).isJsonObject()); + Assert.assertEquals(result.get(FIELD).getAsJsonObject().get("key").getAsString(), "value"); + } + + @Test + public void testStringField_whenFlagOn_andValueIsJsonArray_inlinesAsJsonArray() throws Exception { + state.setProp(MSTAGE_HDFS_READER_PARSE_JSON_STRINGS.getConfig(), "true"); + GenericRecord record = recordWithStringField(FIELD, "[{\"@context\":\"schema.org\"}]"); + + JsonObject result = Whitebox.invokeMethod(reader, "selectFieldsFromGenericRecord", record, FIELDS); + + Assert.assertTrue(result.get(FIELD).isJsonArray()); + Assert.assertEquals( + result.get(FIELD).getAsJsonArray().get(0).getAsJsonObject().get("@context").getAsString(), + "schema.org"); + } + + @Test + public void testStringField_whenFlagOn_andValueIsPlainText_keepsAsJsonPrimitive() throws Exception { + state.setProp(MSTAGE_HDFS_READER_PARSE_JSON_STRINGS.getConfig(), "true"); + GenericRecord record = recordWithStringField(FIELD, "hello world"); + + JsonObject result = Whitebox.invokeMethod(reader, "selectFieldsFromGenericRecord", record, FIELDS); + + Assert.assertTrue(result.get(FIELD).isJsonPrimitive()); + Assert.assertEquals(result.get(FIELD).getAsString(), "hello world"); + } + + @Test + public void testStringField_whenFlagOn_andValueIsMalformedJson_fallsBackToJsonPrimitive() throws Exception { + state.setProp(MSTAGE_HDFS_READER_PARSE_JSON_STRINGS.getConfig(), "true"); + GenericRecord record = recordWithStringField(FIELD, "[{incomplete]"); + + JsonObject result = Whitebox.invokeMethod(reader, "selectFieldsFromGenericRecord", record, FIELDS); + + Assert.assertTrue(result.get(FIELD).isJsonPrimitive()); + Assert.assertEquals(result.get(FIELD).getAsString(), "[{incomplete]"); + } + + @Test + public void testStringField_whenFlagOff_preservesLegacyBehaviorForJsonContent() throws Exception { + GenericRecord record = recordWithStringField(FIELD, "{\"key\":\"value\"}"); + + JsonObject result = Whitebox.invokeMethod(reader, "selectFieldsFromGenericRecord", record, FIELDS); + + Assert.assertTrue(result.get(FIELD).isJsonPrimitive()); + Assert.assertEquals(result.get(FIELD).getAsString(), "{\"key\":\"value\"}"); + } + + @Test + public void testStringField_whenFlagOff_preservesLegacyBehaviorForPlainText() throws Exception { + GenericRecord record = recordWithStringField(FIELD, "hello"); + + JsonObject result = Whitebox.invokeMethod(reader, "selectFieldsFromGenericRecord", record, FIELDS); + + Assert.assertEquals(result.get(FIELD).getAsString(), "hello"); + } + + @Test + public void testStringField_whenFlagOn_andValueIsNull_returnsJsonNull() throws Exception { + state.setProp(MSTAGE_HDFS_READER_PARSE_JSON_STRINGS.getConfig(), "true"); + GenericRecord record = recordWithStringField(FIELD, null); + + JsonObject result = Whitebox.invokeMethod(reader, "selectFieldsFromGenericRecord", record, FIELDS); + + Assert.assertTrue(result.get(FIELD).isJsonNull()); + } + + @Test + public void testStringField_whenFlagOn_andValueIsEmptyString_keepsAsJsonPrimitive() throws Exception { + state.setProp(MSTAGE_HDFS_READER_PARSE_JSON_STRINGS.getConfig(), "true"); + GenericRecord record = recordWithStringField(FIELD, ""); + + JsonObject result = Whitebox.invokeMethod(reader, "selectFieldsFromGenericRecord", record, FIELDS); + + Assert.assertTrue(result.get(FIELD).isJsonPrimitive()); + Assert.assertEquals(result.get(FIELD).getAsString(), ""); + } + + private GenericRecord recordWithStringField(String key, String val) { + Schema schema = SchemaBuilder.record("Test").namespace("com.linkedin.test") + .doc("Test record").fields() + .name(key).doc("test").type().stringType() + .noDefault().endRecord(); + GenericRecord record = new GenericData.Record(schema); + record.put(key, val); + return record; + } +} diff --git a/docs/parameters/ms.hdfs.reader.parse.json.strings.md b/docs/parameters/ms.hdfs.reader.parse.json.strings.md new file mode 100644 index 0000000..2fec001 --- /dev/null +++ b/docs/parameters/ms.hdfs.reader.parse.json.strings.md @@ -0,0 +1,37 @@ +# ms.hdfs.reader.parse.json.strings + +**Tags**: + +**Type**: boolean + +**Format**: true/false + +**Default value**: false + +## Related +- [ms.secondary.input](ms.secondary.input.md) + +## Description + +When set to `true`, `HdfsReader` detects string-typed fields whose values parse as valid JSON (objects or +arrays) and inlines them as JSON elements in the outgoing payload, rather than escaping them as JSON string +primitives. + +This is useful when a pre-serialized JSON document (including JSON-LD, which uses `@`-prefixed field names +that Avro schemas cannot express) is stored as an Avro `string` field and needs to be sent inline in an HTTP +POST body. + +### Behavior + +- `false` (default): string fields are always serialized as JSON string primitives. This is the existing + behavior and is preserved for backward compatibility. +- `true`: for each string field, if the trimmed value begins with `{`/`[` and ends with `}`/`]` and parses + as valid JSON, the parsed JSON element replaces the string primitive. Parse failures fall back to the + default string-primitive behavior. + +### Example + +Given an Avro record with `data: string` containing `[{"@context":"https://schema.org"}]`: + +- Flag off: `{"data":"[{\"@context\":\"https://schema.org\"}]"}` (escaped) +- Flag on: `{"data":[{"@context":"https://schema.org"}]}` (inlined) From 96ef2c36ea66a0cb0b252d80214cca1ddd681616 Mon Sep 17 00:00:00 2001 From: Vignesh Rao Date: Sun, 26 Apr 2026 21:45:14 -0700 Subject: [PATCH 2/6] Use legacy JsonParser API for Gson 2.6.2 compatibility DIL pins Gson 2.6.2 (gradle/scripts/dependencyDefinitions.gradle:31), which doesn't have the static JsonParser.parseString(String) introduced in Gson 2.8.6. Switch to the instance method new JsonParser().parse(s), which has identical semantics on this version and throws the same JsonSyntaxException on malformed input. Caught by local test run. Co-Authored-By: Claude Opus 4.7 (1M context) --- cdi-core/src/main/java/com/linkedin/cdi/util/HdfsReader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdi-core/src/main/java/com/linkedin/cdi/util/HdfsReader.java b/cdi-core/src/main/java/com/linkedin/cdi/util/HdfsReader.java index dab97ca..e58d463 100644 --- a/cdi-core/src/main/java/com/linkedin/cdi/util/HdfsReader.java +++ b/cdi-core/src/main/java/com/linkedin/cdi/util/HdfsReader.java @@ -219,7 +219,7 @@ private JsonObject selectFieldsFromGenericRecord(GenericRecord record, List Date: Sun, 26 Apr 2026 21:55:07 -0700 Subject: [PATCH 3/6] Use gson.fromJson with JsonElement.class for style consistency Replace new JsonParser().parse(decrypted) with gson.fromJson(decrypted, JsonElement.class). Both calls go through the same Gson parser and throw the same JsonSyntaxException on malformed input, but fromJson matches the style of the adjacent ARRAY/RECORD branches, reuses the existing gson field instead of allocating a JsonParser per record, and avoids the @Deprecated marker on JsonParser's instance method in newer Gson versions. All 8 HdfsReaderTest cases continue to pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- cdi-core/src/main/java/com/linkedin/cdi/util/HdfsReader.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cdi-core/src/main/java/com/linkedin/cdi/util/HdfsReader.java b/cdi-core/src/main/java/com/linkedin/cdi/util/HdfsReader.java index e58d463..721f8a5 100644 --- a/cdi-core/src/main/java/com/linkedin/cdi/util/HdfsReader.java +++ b/cdi-core/src/main/java/com/linkedin/cdi/util/HdfsReader.java @@ -10,7 +10,6 @@ import com.google.gson.JsonElement; import com.google.gson.JsonNull; import com.google.gson.JsonObject; -import com.google.gson.JsonParser; import com.google.gson.JsonSyntaxException; import java.util.ArrayList; import java.util.HashMap; @@ -219,7 +218,7 @@ private JsonObject selectFieldsFromGenericRecord(GenericRecord record, List Date: Mon, 27 Apr 2026 04:11:08 -0700 Subject: [PATCH 4/6] Address PR review: register property in allProperties; cover UNION schema path Two review fixes from gautamshanu: 1. Add MSTAGE_HDFS_READER_PARSE_JSON_STRINGS to the allProperties list in PropertyCollection so it participates in startup validation alongside every other MSTAGE_* property. 2. Add a test that uses a nullable string schema (Avro UNION ["string", "null"]) to exercise the UNION branch of selectFieldsFromGenericRecord. Existing tests covered Schema.Type.STRING; production traffic for the target use case is UNION-typed, so this widens coverage to that path. All 9 HdfsReaderTest cases pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../cdi/configuration/PropertyCollection.java | 1 + .../com/linkedin/cdi/util/HdfsReaderTest.java | 23 +++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/cdi-core/src/main/java/com/linkedin/cdi/configuration/PropertyCollection.java b/cdi-core/src/main/java/com/linkedin/cdi/configuration/PropertyCollection.java index aa7cee0..8d73b9b 100644 --- a/cdi-core/src/main/java/com/linkedin/cdi/configuration/PropertyCollection.java +++ b/cdi-core/src/main/java/com/linkedin/cdi/configuration/PropertyCollection.java @@ -342,6 +342,7 @@ protected String getValidNonblankWithDefault(State state) { MSTAGE_EXTRACT_PREPROCESSORS, MSTAGE_EXTRACT_PREPROCESSORS_PARAMETERS, MSTAGE_GRACE_PERIOD_DAYS, + MSTAGE_HDFS_READER_PARSE_JSON_STRINGS, MSTAGE_HTTP_CONN_MAX, MSTAGE_HTTP_CONN_PER_ROUTE_MAX, MSTAGE_HTTP_CONN_TTL_SECONDS, diff --git a/cdi-core/src/test/java/com/linkedin/cdi/util/HdfsReaderTest.java b/cdi-core/src/test/java/com/linkedin/cdi/util/HdfsReaderTest.java index 9a49913..c2ac82c 100644 --- a/cdi-core/src/test/java/com/linkedin/cdi/util/HdfsReaderTest.java +++ b/cdi-core/src/test/java/com/linkedin/cdi/util/HdfsReaderTest.java @@ -119,6 +119,19 @@ public void testStringField_whenFlagOn_andValueIsEmptyString_keepsAsJsonPrimitiv Assert.assertEquals(result.get(FIELD).getAsString(), ""); } + @Test + public void testNullableStringField_whenFlagOn_andValueIsJsonArray_inlinesAsJsonArray() throws Exception { + state.setProp(MSTAGE_HDFS_READER_PARSE_JSON_STRINGS.getConfig(), "true"); + GenericRecord record = recordWithNullableStringField(FIELD, "[{\"@context\":\"schema.org\"}]"); + + JsonObject result = Whitebox.invokeMethod(reader, "selectFieldsFromGenericRecord", record, FIELDS); + + Assert.assertTrue(result.get(FIELD).isJsonArray()); + Assert.assertEquals( + result.get(FIELD).getAsJsonArray().get(0).getAsJsonObject().get("@context").getAsString(), + "schema.org"); + } + private GenericRecord recordWithStringField(String key, String val) { Schema schema = SchemaBuilder.record("Test").namespace("com.linkedin.test") .doc("Test record").fields() @@ -128,4 +141,14 @@ private GenericRecord recordWithStringField(String key, String val) { record.put(key, val); return record; } + + private GenericRecord recordWithNullableStringField(String key, String val) { + Schema schema = SchemaBuilder.record("Test").namespace("com.linkedin.test") + .doc("Test record").fields() + .name(key).doc("test").type().nullable().stringType() + .noDefault().endRecord(); + GenericRecord record = new GenericData.Record(schema); + record.put(key, val); + return record; + } } From 9e7bc513b7e4a131ec507368da18b4575e65f558 Mon Sep 17 00:00:00 2001 From: Vignesh Rao Date: Wed, 29 Apr 2026 08:50:08 -0700 Subject: [PATCH 5/6] Address PR review: rename looksLikeJson helper - Rename `looksLikeJson` to `isValidJson` for clarity. - Rename parameter `s` to `value` and local `t` to `trimmed`. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../java/com/linkedin/cdi/util/HdfsReader.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/cdi-core/src/main/java/com/linkedin/cdi/util/HdfsReader.java b/cdi-core/src/main/java/com/linkedin/cdi/util/HdfsReader.java index 721f8a5..2851ba4 100644 --- a/cdi-core/src/main/java/com/linkedin/cdi/util/HdfsReader.java +++ b/cdi-core/src/main/java/com/linkedin/cdi/util/HdfsReader.java @@ -216,7 +216,7 @@ private JsonObject selectFieldsFromGenericRecord(GenericRecord record, List= 2 - && ((t.charAt(0) == '{' && t.charAt(t.length() - 1) == '}') - || (t.charAt(0) == '[' && t.charAt(t.length() - 1) == ']')); + String trimmed = value.trim(); + return trimmed.length() >= 2 + && ((trimmed.charAt(0) == '{' && trimmed.charAt(trimmed.length() - 1) == '}') + || (trimmed.charAt(0) == '[' && trimmed.charAt(trimmed.length() - 1) == ']')); } /** From 4c2a7684fbb6430a7f24baedcd703785ba2a3eac Mon Sep 17 00:00:00 2001 From: Vignesh Rao Date: Wed, 29 Apr 2026 03:18:42 -0700 Subject: [PATCH 6/6] Pin build-info-extractor-gradle to 5.2.5 to fix Gradle 6.8.1 build MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The buildscript classpath used `latest.release`, which now resolves to 6.0.4. JFrog 6.0.0+ depends on jackson-databind:2.15.4, transitively pulling jackson-core:2.15.4 — a multi-release JAR containing Java 17 (major version 61) class files. Gradle 6.8.1's classpath instrumenter walks every class via a bundled ASM that does not understand Java 17 bytecode and fails with: Failed to create Jar file ~/.gradle/caches/jars-8//jackson-core-2.15.4.jar Caused by: java.lang.IllegalArgumentException: Unsupported class file major version 61 5.2.5 is the last release on jackson-databind:2.14.1, which is Java 8 compatible and configures cleanly under Gradle 6.8.1. --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 95fde42..c7d3e02 100644 --- a/build.gradle +++ b/build.gradle @@ -23,7 +23,7 @@ buildscript { classpath 'org.kt3k.gradle.plugin:coveralls-gradle-plugin:1.0.2' classpath 'me.champeau.gradle:jmh-gradle-plugin:0.4.8' classpath "gradle.plugin.nl.javadude.gradle.plugins:license-gradle-plugin:0.14.0" - classpath 'org.jfrog.buildinfo:build-info-extractor-gradle:latest.release' + classpath 'org.jfrog.buildinfo:build-info-extractor-gradle:5.2.5' classpath 'com.github.jengelman.gradle.plugins:shadow:5.2.0' classpath "org.shipkit:shipkit-auto-version:latest.release" classpath "org.shipkit:shipkit-changelog:latest.release"