Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ buildscript {
classpath 'org.kt3k.gradle.plugin:coveralls-gradle-plugin:1.0.2'
classpath 'me.champeau.gradle:jmh-gradle-plugin:0.4.8'
classpath "gradle.plugin.nl.javadude.gradle.plugins:license-gradle-plugin:0.14.0"
classpath 'org.jfrog.buildinfo:build-info-extractor-gradle:latest.release'
classpath 'org.jfrog.buildinfo:build-info-extractor-gradle:5.2.5'
Comment thread
vigy321 marked this conversation as resolved.
classpath 'com.github.jengelman.gradle.plugins:shadow:5.2.0'
classpath "org.shipkit:shipkit-auto-version:latest.release"
classpath "org.shipkit:shipkit-changelog:latest.release"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ public Long getMillis(State state) {
}
};

BooleanProperties MSTAGE_HDFS_READER_PARSE_JSON_STRINGS =
Comment thread
vigy321 marked this conversation as resolved.
new BooleanProperties("ms.hdfs.reader.parse.json.strings", Boolean.FALSE);

// ms.http.maxConnections has default value 50 and max value 500
// 0 is interpreted as default
IntegerProperties MSTAGE_HTTP_CONN_MAX =
Expand Down Expand Up @@ -339,6 +342,7 @@ protected String getValidNonblankWithDefault(State state) {
MSTAGE_EXTRACT_PREPROCESSORS,
MSTAGE_EXTRACT_PREPROCESSORS_PARAMETERS,
MSTAGE_GRACE_PERIOD_DAYS,
MSTAGE_HDFS_READER_PARSE_JSON_STRINGS,
MSTAGE_HTTP_CONN_MAX,
MSTAGE_HTTP_CONN_PER_ROUTE_MAX,
MSTAGE_HTTP_CONN_TTL_SECONDS,
Expand Down
24 changes: 23 additions & 1 deletion cdi-core/src/main/java/com/linkedin/cdi/util/HdfsReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.google.gson.JsonElement;
import com.google.gson.JsonNull;
import com.google.gson.JsonObject;
import com.google.gson.JsonSyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -29,6 +30,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.linkedin.cdi.configuration.PropertyCollection.*;
import static com.linkedin.cdi.configuration.StaticConstants.*;


Expand Down Expand Up @@ -206,13 +208,23 @@ private JsonArray readFileAsJsonArray(
@VisibleForTesting
private JsonObject selectFieldsFromGenericRecord(GenericRecord record, List<String> fields) {
JsonObject jsonObject = new JsonObject();
boolean inlineJsonStrings = MSTAGE_HDFS_READER_PARSE_JSON_STRINGS.get(state);
for (String field: fields) {
Object valueObject = record.get(field);
Schema.Type fieldType = record.getSchema().getField(field).schema().getType();
if (valueObject == null || fieldType == Schema.Type.NULL) {
jsonObject.add(field, JsonNull.INSTANCE);
} else if (fieldType == Schema.Type.STRING || fieldType == Schema.Type.UNION) {
jsonObject.addProperty(field, EncryptionUtils.decryptGobblin(valueObject.toString(), state));
String decrypted = EncryptionUtils.decryptGobblin(valueObject.toString(), state);
if (inlineJsonStrings && isValidJson(decrypted)) {
try {
jsonObject.add(field, gson.fromJson(decrypted, JsonElement.class));
} catch (JsonSyntaxException e) {
jsonObject.addProperty(field, decrypted);
}
} else {
jsonObject.addProperty(field, decrypted);
}
} else if (fieldType == Schema.Type.ARRAY) {
jsonObject.add(field, gson.fromJson(valueObject.toString(), JsonArray.class));
} else if (fieldType == Schema.Type.RECORD) {
Expand All @@ -230,6 +242,16 @@ private JsonObject selectFieldsFromGenericRecord(GenericRecord record, List<Stri
return jsonObject;
}

private static boolean isValidJson(String value) {
if (value == null) {
return false;
}
String trimmed = value.trim();
return trimmed.length() >= 2
&& ((trimmed.charAt(0) == '{' && trimmed.charAt(trimmed.length() - 1) == '}')
|| (trimmed.charAt(0) == '[' && trimmed.charAt(trimmed.length() - 1) == ']'));
}

/**
* retrieve the filters from the secondary input definition
*
Expand Down
154 changes: 154 additions & 0 deletions cdi-core/src/test/java/com/linkedin/cdi/util/HdfsReaderTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Copyright 2021 LinkedIn Corporation. All rights reserved.
// Licensed under the BSD-2 Clause license.
// See LICENSE in the project root for license information.

package com.linkedin.cdi.util;

import com.google.gson.JsonObject;
import java.util.Collections;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.gobblin.configuration.SourceState;
import org.powermock.reflect.Whitebox;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static com.linkedin.cdi.configuration.PropertyCollection.*;


public class HdfsReaderTest {
private static final String FIELD = "data";
private static final List<String> FIELDS = Collections.singletonList(FIELD);

private SourceState state;
private HdfsReader reader;

@BeforeMethod
public void setUp() {
state = new SourceState();
reader = new HdfsReader(state);
}

@Test
public void testStringField_whenFlagOn_andValueIsJsonObject_inlinesAsJsonObject() throws Exception {
state.setProp(MSTAGE_HDFS_READER_PARSE_JSON_STRINGS.getConfig(), "true");
GenericRecord record = recordWithStringField(FIELD, "{\"key\":\"value\"}");

JsonObject result = Whitebox.invokeMethod(reader, "selectFieldsFromGenericRecord", record, FIELDS);

Assert.assertTrue(result.get(FIELD).isJsonObject());
Assert.assertEquals(result.get(FIELD).getAsJsonObject().get("key").getAsString(), "value");
}

@Test
public void testStringField_whenFlagOn_andValueIsJsonArray_inlinesAsJsonArray() throws Exception {
state.setProp(MSTAGE_HDFS_READER_PARSE_JSON_STRINGS.getConfig(), "true");
GenericRecord record = recordWithStringField(FIELD, "[{\"@context\":\"schema.org\"}]");

JsonObject result = Whitebox.invokeMethod(reader, "selectFieldsFromGenericRecord", record, FIELDS);

Assert.assertTrue(result.get(FIELD).isJsonArray());
Assert.assertEquals(
result.get(FIELD).getAsJsonArray().get(0).getAsJsonObject().get("@context").getAsString(),
"schema.org");
}

@Test
public void testStringField_whenFlagOn_andValueIsPlainText_keepsAsJsonPrimitive() throws Exception {
state.setProp(MSTAGE_HDFS_READER_PARSE_JSON_STRINGS.getConfig(), "true");
GenericRecord record = recordWithStringField(FIELD, "hello world");

JsonObject result = Whitebox.invokeMethod(reader, "selectFieldsFromGenericRecord", record, FIELDS);

Assert.assertTrue(result.get(FIELD).isJsonPrimitive());
Assert.assertEquals(result.get(FIELD).getAsString(), "hello world");
}

@Test
public void testStringField_whenFlagOn_andValueIsMalformedJson_fallsBackToJsonPrimitive() throws Exception {
state.setProp(MSTAGE_HDFS_READER_PARSE_JSON_STRINGS.getConfig(), "true");
GenericRecord record = recordWithStringField(FIELD, "[{incomplete]");

JsonObject result = Whitebox.invokeMethod(reader, "selectFieldsFromGenericRecord", record, FIELDS);

Assert.assertTrue(result.get(FIELD).isJsonPrimitive());
Assert.assertEquals(result.get(FIELD).getAsString(), "[{incomplete]");
}

@Test
public void testStringField_whenFlagOff_preservesLegacyBehaviorForJsonContent() throws Exception {
GenericRecord record = recordWithStringField(FIELD, "{\"key\":\"value\"}");

JsonObject result = Whitebox.invokeMethod(reader, "selectFieldsFromGenericRecord", record, FIELDS);

Assert.assertTrue(result.get(FIELD).isJsonPrimitive());
Assert.assertEquals(result.get(FIELD).getAsString(), "{\"key\":\"value\"}");
}

@Test
public void testStringField_whenFlagOff_preservesLegacyBehaviorForPlainText() throws Exception {
GenericRecord record = recordWithStringField(FIELD, "hello");

JsonObject result = Whitebox.invokeMethod(reader, "selectFieldsFromGenericRecord", record, FIELDS);

Assert.assertEquals(result.get(FIELD).getAsString(), "hello");
}

@Test
public void testStringField_whenFlagOn_andValueIsNull_returnsJsonNull() throws Exception {
state.setProp(MSTAGE_HDFS_READER_PARSE_JSON_STRINGS.getConfig(), "true");
GenericRecord record = recordWithStringField(FIELD, null);

JsonObject result = Whitebox.invokeMethod(reader, "selectFieldsFromGenericRecord", record, FIELDS);

Assert.assertTrue(result.get(FIELD).isJsonNull());
}

@Test
public void testStringField_whenFlagOn_andValueIsEmptyString_keepsAsJsonPrimitive() throws Exception {
state.setProp(MSTAGE_HDFS_READER_PARSE_JSON_STRINGS.getConfig(), "true");
GenericRecord record = recordWithStringField(FIELD, "");

JsonObject result = Whitebox.invokeMethod(reader, "selectFieldsFromGenericRecord", record, FIELDS);

Assert.assertTrue(result.get(FIELD).isJsonPrimitive());
Assert.assertEquals(result.get(FIELD).getAsString(), "");
}

@Test
public void testNullableStringField_whenFlagOn_andValueIsJsonArray_inlinesAsJsonArray() throws Exception {
state.setProp(MSTAGE_HDFS_READER_PARSE_JSON_STRINGS.getConfig(), "true");
GenericRecord record = recordWithNullableStringField(FIELD, "[{\"@context\":\"schema.org\"}]");

JsonObject result = Whitebox.invokeMethod(reader, "selectFieldsFromGenericRecord", record, FIELDS);

Assert.assertTrue(result.get(FIELD).isJsonArray());
Assert.assertEquals(
result.get(FIELD).getAsJsonArray().get(0).getAsJsonObject().get("@context").getAsString(),
"schema.org");
}

private GenericRecord recordWithStringField(String key, String val) {
Comment thread
vigy321 marked this conversation as resolved.
Schema schema = SchemaBuilder.record("Test").namespace("com.linkedin.test")
.doc("Test record").fields()
.name(key).doc("test").type().stringType()
.noDefault().endRecord();
GenericRecord record = new GenericData.Record(schema);
record.put(key, val);
return record;
}

private GenericRecord recordWithNullableStringField(String key, String val) {
Schema schema = SchemaBuilder.record("Test").namespace("com.linkedin.test")
.doc("Test record").fields()
.name(key).doc("test").type().nullable().stringType()
.noDefault().endRecord();
GenericRecord record = new GenericData.Record(schema);
record.put(key, val);
return record;
}
}
37 changes: 37 additions & 0 deletions docs/parameters/ms.hdfs.reader.parse.json.strings.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# ms.hdfs.reader.parse.json.strings

**Tags**:

**Type**: boolean

**Format**: true/false

**Default value**: false

## Related
- [ms.secondary.input](ms.secondary.input.md)

## Description

When set to `true`, `HdfsReader` detects string-typed fields whose values parse as valid JSON (objects or
arrays) and inlines them as JSON elements in the outgoing payload, rather than escaping them as JSON string
primitives.

This is useful when a pre-serialized JSON document (including JSON-LD, which uses `@`-prefixed field names
that Avro schemas cannot express) is stored as an Avro `string` field and needs to be sent inline in an HTTP
POST body.

### Behavior

- `false` (default): string fields are always serialized as JSON string primitives. This is the existing
behavior and is preserved for backward compatibility.
- `true`: for each string field, if the trimmed value begins with `{`/`[` and ends with `}`/`]` and parses
as valid JSON, the parsed JSON element replaces the string primitive. Parse failures fall back to the
default string-primitive behavior.

### Example

Given an Avro record with `data: string` containing `[{"@context":"https://schema.org"}]`:

- Flag off: `{"data":"[{\"@context\":\"https://schema.org\"}]"}` (escaped)
- Flag on: `{"data":[{"@context":"https://schema.org"}]}` (inlined)
Loading