Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ public final class AvroConverter {

private static final String KEY_OPTION = "key.fields";
private static final String KEY_PREFIX_OPTION = "key.fields-prefix";
private static final String PRIMITIVE_KEY = "KEY";

private AvroConverter() {
}
Expand Down Expand Up @@ -130,7 +129,7 @@ public static Pair<Schema, Schema> avroKeyPayloadSchema(String namespace, String
String keyName = field.getName().substring(keyPrefix.length());

// Key is a primitive
if (keyNames.size() == 1 && keyName.equals(PRIMITIVE_KEY)) {
if (keyNames.size() == 1 && keyName.equals(AvroSchemas.PRIMITIVE_KEY_NAME)) {
primitiveKeySchema = avro(namespace, keySchemaName, field.getType());
} else {
keyBuilder.add(keyName, field.getType());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.linkedin.hoptimator.avro;

import org.apache.avro.Schema;


/**
* Implemented by Calcite {@link org.apache.calcite.schema.Table}s backed by native Avro metadata.
* Exposes source-of-truth key and value schemas to downstream consumers (connector deployers,
* resolvers) without round-tripping through Calcite's {@code RelDataType} — preserving namespaces,
* nested record names, reused record definitions, default values, and enum/fixed metadata that
* the type system flattens away.
*
* <p>Consumers pick the view that matches their need:
* <ul>
* <li>{@link #valueSchema()} — the record's data payload.
* <li>{@link #keySchema()} — the record's key schema, or {@code null} when the table has no
* distinct key concept. A struct key exposes its fields directly; a primitive key returns
* a primitive {@link Schema}.
* </ul>
*/
public interface AvroSchemaSource {

/**
* Returns the value/payload Avro schema — the data record's schema without any query-layer
* scaffolding like {@code KEY_}-prefixed key fields. Connector payload options should render
* this.
*/
Schema valueSchema();

/**
* Returns the key Avro schema, or {@code null} when this table has no distinct key concept.
* Struct keys expose their fields directly; primitive keys return a primitive {@link Schema}.
* SQL/query-layer consumers may merge this with {@link #valueSchema()} via
* {@link AvroSchemas#mergedAvroSchemaFor(AvroSchemaSource)}.
*/
default Schema keySchema() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package com.linkedin.hoptimator.avro;

import org.apache.avro.Schema;

import java.util.ArrayList;
import java.util.List;


/**
* Utilities for composing Avro schemas. The primary use case is Hoptimator-style stores that
* back each logical table with a separate key and value Avro schema and need to expose a single
* flattened record (key fields prefixed, followed by value fields) without round-tripping
* through Calcite's {@code RelDataType}.
*/
public final class AvroSchemas {

/**
* Standard Hoptimator prefix applied to fields contributed by a record-typed key when merging
* into the value schema for SQL/query consumption.
*/
public static final String KEY_PREFIX = "KEY_";

/**
* Standard Hoptimator field name used when a primitive key is merged into the value schema for
* SQL/query consumption.
*/
public static final String PRIMITIVE_KEY_NAME = "KEY";

private AvroSchemas() {
}

/**
* Produces a fresh {@link Schema.Field} clone. Avro's {@code Schema.setFields} rejects Fields
* that already belong to another record (it tracks the field's position as a mutable guard), so
* fields from an input schema cannot be handed to a new record directly. Cloning also lets
* callers rename fields (e.g., apply a {@code KEY_} prefix).
*
* <p>Copies every attribute Avro exposes on a Field: name (from the caller), schema reference
* (the type Schema is shared by reference — nested records, enums, namespaces, etc. survive
* intact), doc, default value, sort order, aliases, and custom properties.
*/
public static Schema.Field cloneField(String name, Schema.Field original) {
Object defaultVal = original.hasDefaultValue() ? original.defaultVal() : null;
Schema.Field clone = new Schema.Field(name, original.schema(), original.doc(), defaultVal,
original.order());
original.aliases().forEach(clone::addAlias);
original.getObjectProps().forEach(clone::addProp);
return clone;
}

/**
* Produces the merged Avro schema for an {@link AvroSchemaSource}: the value schema with key
* fields prepended using Hoptimator's standard convention ({@link #KEY_PREFIX} for struct keys,
* {@link #PRIMITIVE_KEY_NAME} field for primitive keys). When the source has no key
* ({@link AvroSchemaSource#keySchema()} returns {@code null}), returns the value schema
* unchanged.
*
* <p>Centralizes the Hoptimator merging convention so SQL/query-layer consumers (like
* {@code HoptimatorConnection.resolve()}) share one implementation.
*/
public static Schema mergedAvroSchemaFor(AvroSchemaSource source) {
Schema key = source.keySchema();
Schema value = source.valueSchema();
if (key == null) {
return value;
}
return mergeKeyIntoValue(key, value, KEY_PREFIX, PRIMITIVE_KEY_NAME);
}

/**
* Merges a key Avro schema and a value record schema into a single record that inherits the
* value schema's identity (namespace, name, doc, isError flag), aliases, and record-level
* custom properties. Struct keys contribute one field per key field, each prefixed with
* {@code keyPrefix}. Primitive keys (or any non-record key) contribute a single field named
* {@code primitiveKeyName} with the key schema as its type.
*
* @param keySchema key Avro schema. If a record, its fields are prefixed and prepended.
* Otherwise, a single primitive key field is prepended.
* @param valueSchema must be a {@link Schema.Type#RECORD}.
* @param keyPrefix prefix applied to each key field name when key is a record (e.g.
* {@code "KEY_"}).
* @param primitiveKeyName field name used when key is primitive (e.g. {@code "KEY"}).
*/
static Schema mergeKeyIntoValue(Schema keySchema, Schema valueSchema,
String keyPrefix, String primitiveKeyName) {
if (valueSchema.getType() != Schema.Type.RECORD) {
throw new IllegalArgumentException(
"Value schema must be a record; got " + valueSchema.getType());
}
List<Schema.Field> allFields = new ArrayList<>();
if (keySchema.getType() == Schema.Type.RECORD) {
for (Schema.Field kf : keySchema.getFields()) {
allFields.add(cloneField(keyPrefix + kf.name(), kf));
}
} else {
allFields.add(new Schema.Field(primitiveKeyName, keySchema, "Primitive key field.", null));
}
for (Schema.Field vf : valueSchema.getFields()) {
allFields.add(cloneField(vf.name(), vf));
}
Schema merged = Schema.createRecord(
valueSchema.getName(),
valueSchema.getDoc(),
valueSchema.getNamespace(),
valueSchema.isError());
merged.setFields(allFields);
valueSchema.getAliases().forEach(merged::addAlias);
valueSchema.getObjectProps().forEach(merged::addProp);
return merged;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
package com.linkedin.hoptimator.avro;

import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;


class AvroSchemasTest {

// --- cloneField ---

@Test
void cloneFieldProducesUnownedCopy() {
Schema.Field original = new Schema.Field("a", Schema.create(Schema.Type.STRING), "doc", null);
// Add to a record so position is set (this is what setFields would do).
Schema parent = Schema.createRecord("P", null, "ns", false);
parent.setFields(List.of(original));
assertFalse(original.pos() == -1, "precondition: position assigned by setFields");

Schema.Field clone = AvroSchemas.cloneField("a", original);

// Cloned Field is unowned — can be installed in another record without Avro rejecting it.
Schema other = Schema.createRecord("Q", null, "ns", false);
other.setFields(List.of(clone));
assertEquals("a", other.getField("a").name());
}

@Test
void cloneFieldRenamesField() {
Schema.Field original = new Schema.Field("a", Schema.create(Schema.Type.STRING), null, null);
Schema.Field clone = AvroSchemas.cloneField("KEY_a", original);
assertEquals("KEY_a", clone.name());
assertSame(original.schema(), clone.schema(), "type schema shared by reference");
}

@Test
void cloneFieldPreservesOrderAliasesPropsAndDefault() {
Schema.Field original = new Schema.Field("a", Schema.create(Schema.Type.STRING), "doc",
"fallback", Schema.Field.Order.DESCENDING);
original.addProp("java", Map.of("class", "com.linkedin.common.urn.Urn"));
original.addProp("compliance", "NONE");
original.addAlias("legacyName");

Schema.Field clone = AvroSchemas.cloneField("a", original);

assertEquals("doc", clone.doc());
assertEquals("fallback", clone.defaultVal());
assertEquals(Schema.Field.Order.DESCENDING, clone.order());
assertTrue(clone.aliases().contains("legacyName"));
assertEquals(Map.of("class", "com.linkedin.common.urn.Urn"), clone.getObjectProp("java"));
assertEquals("NONE", clone.getObjectProp("compliance"));
}

@Test
void cloneFieldHandlesNoDefaultValue() {
Schema.Field original = new Schema.Field("a", Schema.create(Schema.Type.STRING), null, null);
// `original` has no default (the null passed above is the default-value arg; field has no
// default because it's not a nullable union). hasDefaultValue() is false.
assertFalse(original.hasDefaultValue());

Schema.Field clone = AvroSchemas.cloneField("a", original);

assertFalse(clone.hasDefaultValue(), "no-default fields clone without inventing one");
}

// --- mergeKeyIntoValue ---

@Test
void mergeKeyIntoValueInheritsValueSchemaIdentityAndProps() {
Schema keySchema = SchemaBuilder.record("Key").fields().requiredString("id").endRecord();
Schema valueSchema = SchemaBuilder.record("User").namespace("com.linkedin.foo")
.aliases("com.linkedin.foo.UserV1").doc("User doc")
.prop("owningTeam", "urn:li:internalTeam:feed")
.fields().requiredString("name").endRecord();

Schema merged = AvroSchemas.mergeKeyIntoValue(keySchema, valueSchema, "KEY_", "KEY");

assertEquals("User doc", merged.getDoc());
assertTrue(merged.getAliases().contains("com.linkedin.foo.UserV1"));
assertEquals("urn:li:internalTeam:feed", merged.getObjectProp("owningTeam"));
}

@Test
void mergeKeyIntoValueThrowsForNonRecordValue() {
assertThrows(IllegalArgumentException.class,
() -> AvroSchemas.mergeKeyIntoValue(
Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.STRING), "KEY_", "KEY"));
}

@Test
void mergeKeyIntoValueStructKeyGetsPrefix() {
Schema keySchema = SchemaBuilder.record("Key").namespace("com.linkedin.k").fields()
.requiredString("id").requiredInt("partition").endRecord();
Schema valueSchema = SchemaBuilder.record("User").namespace("com.linkedin.v").fields()
.requiredString("name").endRecord();

Schema merged = AvroSchemas.mergeKeyIntoValue(keySchema, valueSchema, "KEY_", "KEY");

assertEquals(List.of("KEY_id", "KEY_partition", "name"),
merged.getFields().stream().map(Schema.Field::name).collect(Collectors.toList()));
assertEquals("com.linkedin.v", merged.getNamespace(), "merged inherits value namespace");
assertEquals("User", merged.getName(), "merged inherits value name");
}

@Test
void mergeKeyIntoValuePrimitiveKeyBecomesSingleNamedField() {
Schema keySchema = Schema.create(Schema.Type.STRING);
Schema valueSchema = SchemaBuilder.record("User").namespace("com.linkedin.v").fields()
.requiredString("name").endRecord();

Schema merged = AvroSchemas.mergeKeyIntoValue(keySchema, valueSchema, "KEY_", "KEY");

assertEquals(2, merged.getFields().size());
assertEquals("KEY", merged.getFields().get(0).name());
assertEquals(Schema.Type.STRING, merged.getFields().get(0).schema().getType());
assertEquals("name", merged.getFields().get(1).name());
}

@Test
void mergeKeyIntoValuePreservesNestedRecordIdentities() {
// Nested records in value (or key) keep their source namespaces because the Schema reference
// is shared by cloneField — no round-trip through a type system.
Schema address = SchemaBuilder.record("Address").namespace("com.linkedin.addr").fields()
.requiredString("city").endRecord();
Schema valueSchema = SchemaBuilder.record("User").namespace("com.linkedin.v").fields()
.name("address").type(address).noDefault().endRecord();

Schema merged = AvroSchemas.mergeKeyIntoValue(
Schema.create(Schema.Type.STRING), valueSchema, "KEY_", "KEY");

Schema addrField = merged.getField("address").schema();
assertSame(address, addrField, "nested record schema shared by reference");
assertEquals("com.linkedin.addr", addrField.getNamespace());
}

@Test
void mergeKeyIntoValueReusedRecordsRenderAsNamedReferences() {
// Same Schema instance referenced twice in the value → Avro serializer writes the record
// definition once and references by FQN thereafter.
Schema shared = SchemaBuilder.record("Shared").namespace("com.linkedin.s").fields()
.requiredString("v").endRecord();
Schema valueSchema = SchemaBuilder.record("User").namespace("com.linkedin.v").fields()
.name("first").type(shared).noDefault()
.name("second").type(shared).noDefault()
.endRecord();

Schema merged = AvroSchemas.mergeKeyIntoValue(
Schema.create(Schema.Type.STRING), valueSchema, "KEY_", "KEY");
String json = merged.toString(false);

int firstDef = json.indexOf("\"name\":\"Shared\"");
int secondDef = json.indexOf("\"name\":\"Shared\"", firstDef + 1);
assertEquals(-1, secondDef, "reused record serialized once; got " + json);
}

@Test
void mergeKeyIntoValueKeyFieldsKeepFieldProps() {
// Custom props on key fields (common for LinkedIn schemas — "java", "validate", etc.) should
// survive the key→KEY_ rename.
Schema keySchema = SchemaBuilder.record("Key").fields()
.name("id").type().stringType().noDefault()
.endRecord();
keySchema.getField("id").addProp("compliance", "NONE");
Schema valueSchema = SchemaBuilder.record("V").namespace("v").fields()
.requiredString("x").endRecord();

Schema merged = AvroSchemas.mergeKeyIntoValue(keySchema, valueSchema, "KEY_", "KEY");

assertEquals("NONE", merged.getField("KEY_id").getObjectProp("compliance"));
}

@Test
void mergeKeyIntoValueSchemaRoundTripsThroughAvroParser() {
// End-to-end sanity: the merged schema is parseable — ensures we don't produce anything that
// would trip Avro's validation (e.g. duplicate names, unresolvable references).
Schema keySchema = SchemaBuilder.record("Key").namespace("com.linkedin.k").fields()
.requiredString("id").endRecord();
Schema valueSchema = SchemaBuilder.record("User").namespace("com.linkedin.v").fields()
.requiredString("name").endRecord();
Schema merged = AvroSchemas.mergeKeyIntoValue(keySchema, valueSchema, "KEY_", "KEY");

Schema reparsed = new Schema.Parser().parse(merged.toString(true));
assertNotNull(reparsed);
assertEquals("com.linkedin.v.User", reparsed.getFullName());
assertEquals(2, reparsed.getFields().size());
}
}
Loading
Loading