From 827706fd3ac46a9b4f1dc819b5090194a20e90a9 Mon Sep 17 00:00:00 2001 From: Joseph Grogan Date: Wed, 22 Apr 2026 10:55:23 -0400 Subject: [PATCH 1/2] Add avro schema passthrough --- .../hoptimator/avro/AvroConverter.java | 3 +- .../hoptimator/avro/AvroSchemaProvider.java | 39 ++++ .../linkedin/hoptimator/avro/AvroSchemas.java | 111 ++++++++++ .../hoptimator/avro/AvroSchemasTest.java | 198 ++++++++++++++++++ .../hoptimator/jdbc/HoptimatorConnection.java | 52 ++++- .../jdbc/HoptimatorConnectionTest.java | 110 ++++++++++ hoptimator-k8s/build.gradle | 2 + .../linkedin/hoptimator/k8s/K8sConnector.java | 42 +++- .../hoptimator/k8s/K8sConnectorTest.java | 111 ++++++++++ hoptimator-kafka/build.gradle | 1 + .../linkedin/hoptimator/kafka/KafkaTopic.java | 3 +- hoptimator-mysql/build.gradle | 1 + .../hoptimator/mysql/MySqlDeployer.java | 23 +- hoptimator-util/build.gradle | 2 + .../util/planner/HoptimatorJdbcTable.java | 61 +++++- .../util/planner/HoptimatorJdbcTableTest.java | 100 +++++++++ .../hoptimator/venice/VeniceStore.java | 34 ++- .../hoptimator/venice/VeniceStoreTest.java | 84 ++++++++ 18 files changed, 940 insertions(+), 37 deletions(-) create mode 100644 hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroSchemaProvider.java create mode 100644 hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroSchemas.java create mode 100644 hoptimator-avro/src/test/java/com/linkedin/hoptimator/avro/AvroSchemasTest.java diff --git a/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroConverter.java b/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroConverter.java index f7b2ac88..0870d4db 100644 --- a/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroConverter.java +++ b/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroConverter.java @@ -25,7 +25,6 @@ public final class AvroConverter { private static final String KEY_OPTION = "key.fields"; private static final String KEY_PREFIX_OPTION = "key.fields-prefix"; - private static final String PRIMITIVE_KEY = "KEY"; private AvroConverter() { } @@ -130,7 +129,7 @@ public static Pair avroKeyPayloadSchema(String namespace, String String keyName = field.getName().substring(keyPrefix.length()); // Key is a primitive - if (keyNames.size() == 1 && keyName.equals(PRIMITIVE_KEY)) { + if (keyNames.size() == 1 && keyName.equals(AvroSchemas.PRIMITIVE_KEY_NAME)) { primitiveKeySchema = avro(namespace, keySchemaName, field.getType()); } else { keyBuilder.add(keyName, field.getType()); diff --git a/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroSchemaProvider.java b/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroSchemaProvider.java new file mode 100644 index 00000000..e04f68f5 --- /dev/null +++ b/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroSchemaProvider.java @@ -0,0 +1,39 @@ +package com.linkedin.hoptimator.avro; + +import org.apache.avro.Schema; + + +/** + * Implemented by Calcite {@link org.apache.calcite.schema.Table}s backed by native Avro metadata. + * Exposes source-of-truth key and value schemas to downstream consumers (connector deployers, + * resolvers) without round-tripping through Calcite's {@code RelDataType} — preserving namespaces, + * nested record names, reused record definitions, default values, and enum/fixed metadata that + * the type system flattens away. + * + *

Consumers pick the view that matches their need: + *

+ */ +public interface AvroSchemaProvider { + + /** + * Returns the value/payload Avro schema — the data record's schema without any query-layer + * scaffolding like {@code KEY_}-prefixed key fields. Connector payload options should render + * this. + */ + Schema valueSchema(); + + /** + * Returns the key Avro schema, or {@code null} when this table has no distinct key concept. + * Struct keys expose their fields directly; primitive keys return a primitive {@link Schema}. + * SQL/query-layer consumers may merge this with {@link #valueSchema()} via + * {@link AvroSchemas#mergedAvroSchemaFor(AvroSchemaProvider)}. + */ + default Schema keySchema() { + return null; + } +} diff --git a/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroSchemas.java b/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroSchemas.java new file mode 100644 index 00000000..aa2df42d --- /dev/null +++ b/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroSchemas.java @@ -0,0 +1,111 @@ +package com.linkedin.hoptimator.avro; + +import org.apache.avro.Schema; + +import java.util.ArrayList; +import java.util.List; + + +/** + * Utilities for composing Avro schemas. The primary use case is Hoptimator-style stores that + * back each logical table with a separate key and value Avro schema and need to expose a single + * flattened record (key fields prefixed, followed by value fields) without round-tripping + * through Calcite's {@code RelDataType}. + */ +public final class AvroSchemas { + + /** + * Standard Hoptimator prefix applied to fields contributed by a record-typed key when merging + * into the value schema for SQL/query consumption. + */ + public static final String KEY_PREFIX = "KEY_"; + + /** + * Standard Hoptimator field name used when a primitive key is merged into the value schema for + * SQL/query consumption. + */ + public static final String PRIMITIVE_KEY_NAME = "KEY"; + + private AvroSchemas() { + } + + /** + * Produces a fresh {@link Schema.Field} clone. Avro's {@code Schema.setFields} rejects Fields + * that already belong to another record (it tracks the field's position as a mutable guard), so + * fields from an input schema cannot be handed to a new record directly. Cloning also lets + * callers rename fields (e.g., apply a {@code KEY_} prefix). + * + *

Copies every attribute Avro exposes on a Field: name (from the caller), schema reference + * (the type Schema is shared by reference — nested records, enums, namespaces, etc. survive + * intact), doc, default value, sort order, aliases, and custom properties. + */ + public static Schema.Field cloneField(String name, Schema.Field original) { + Object defaultVal = original.hasDefaultValue() ? original.defaultVal() : null; + Schema.Field clone = new Schema.Field(name, original.schema(), original.doc(), defaultVal, + original.order()); + original.aliases().forEach(clone::addAlias); + original.getObjectProps().forEach(clone::addProp); + return clone; + } + + /** + * Produces the merged Avro schema for an {@link AvroSchemaProvider}: the value schema with key + * fields prepended using Hoptimator's standard convention ({@link #KEY_PREFIX} for struct keys, + * {@link #PRIMITIVE_KEY_NAME} field for primitive keys). When the provider has no key + * ({@link AvroSchemaProvider#keySchema()} returns {@code null}), returns the value schema + * unchanged. + * + *

Centralizes the Hoptimator merging convention so SQL/query-layer consumers (like + * {@code HoptimatorConnection.resolve()}) share one implementation. + */ + public static Schema mergedAvroSchemaFor(AvroSchemaProvider provider) { + Schema key = provider.keySchema(); + Schema value = provider.valueSchema(); + if (key == null) { + return value; + } + return mergeKeyIntoValue(key, value, KEY_PREFIX, PRIMITIVE_KEY_NAME); + } + + /** + * Merges a key Avro schema and a value record schema into a single record that inherits the + * value schema's identity (namespace, name, doc, isError flag), aliases, and record-level + * custom properties. Struct keys contribute one field per key field, each prefixed with + * {@code keyPrefix}. Primitive keys (or any non-record key) contribute a single field named + * {@code primitiveKeyName} with the key schema as its type. + * + * @param keySchema key Avro schema. If a record, its fields are prefixed and prepended. + * Otherwise, a single primitive key field is prepended. + * @param valueSchema must be a {@link Schema.Type#RECORD}. + * @param keyPrefix prefix applied to each key field name when key is a record (e.g. + * {@code "KEY_"}). + * @param primitiveKeyName field name used when key is primitive (e.g. {@code "KEY"}). + */ + static Schema mergeKeyIntoValue(Schema keySchema, Schema valueSchema, + String keyPrefix, String primitiveKeyName) { + if (valueSchema.getType() != Schema.Type.RECORD) { + throw new IllegalArgumentException( + "Value schema must be a record; got " + valueSchema.getType()); + } + List allFields = new ArrayList<>(); + if (keySchema.getType() == Schema.Type.RECORD) { + for (Schema.Field kf : keySchema.getFields()) { + allFields.add(cloneField(keyPrefix + kf.name(), kf)); + } + } else { + allFields.add(new Schema.Field(primitiveKeyName, keySchema, "Primitive key field.", null)); + } + for (Schema.Field vf : valueSchema.getFields()) { + allFields.add(cloneField(vf.name(), vf)); + } + Schema merged = Schema.createRecord( + valueSchema.getName(), + valueSchema.getDoc(), + valueSchema.getNamespace(), + valueSchema.isError()); + merged.setFields(allFields); + valueSchema.getAliases().forEach(merged::addAlias); + valueSchema.getObjectProps().forEach(merged::addProp); + return merged; + } +} diff --git a/hoptimator-avro/src/test/java/com/linkedin/hoptimator/avro/AvroSchemasTest.java b/hoptimator-avro/src/test/java/com/linkedin/hoptimator/avro/AvroSchemasTest.java new file mode 100644 index 00000000..e81e4ed4 --- /dev/null +++ b/hoptimator-avro/src/test/java/com/linkedin/hoptimator/avro/AvroSchemasTest.java @@ -0,0 +1,198 @@ +package com.linkedin.hoptimator.avro; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +class AvroSchemasTest { + + // --- cloneField --- + + @Test + void cloneFieldProducesUnownedCopy() { + Schema.Field original = new Schema.Field("a", Schema.create(Schema.Type.STRING), "doc", null); + // Add to a record so position is set (this is what setFields would do). + Schema parent = Schema.createRecord("P", null, "ns", false); + parent.setFields(List.of(original)); + assertFalse(original.pos() == -1, "precondition: position assigned by setFields"); + + Schema.Field clone = AvroSchemas.cloneField("a", original); + + // Cloned Field is unowned — can be installed in another record without Avro rejecting it. + Schema other = Schema.createRecord("Q", null, "ns", false); + other.setFields(List.of(clone)); + assertEquals("a", other.getField("a").name()); + } + + @Test + void cloneFieldRenamesField() { + Schema.Field original = new Schema.Field("a", Schema.create(Schema.Type.STRING), null, null); + Schema.Field clone = AvroSchemas.cloneField("KEY_a", original); + assertEquals("KEY_a", clone.name()); + assertSame(original.schema(), clone.schema(), "type schema shared by reference"); + } + + @Test + void cloneFieldPreservesOrderAliasesPropsAndDefault() { + Schema.Field original = new Schema.Field("a", Schema.create(Schema.Type.STRING), "doc", + "fallback", Schema.Field.Order.DESCENDING); + original.addProp("java", Map.of("class", "com.linkedin.common.urn.Urn")); + original.addProp("compliance", "NONE"); + original.addAlias("legacyName"); + + Schema.Field clone = AvroSchemas.cloneField("a", original); + + assertEquals("doc", clone.doc()); + assertEquals("fallback", clone.defaultVal()); + assertEquals(Schema.Field.Order.DESCENDING, clone.order()); + assertTrue(clone.aliases().contains("legacyName")); + assertEquals(Map.of("class", "com.linkedin.common.urn.Urn"), clone.getObjectProp("java")); + assertEquals("NONE", clone.getObjectProp("compliance")); + } + + @Test + void cloneFieldHandlesNoDefaultValue() { + Schema.Field original = new Schema.Field("a", Schema.create(Schema.Type.STRING), null, null); + // `original` has no default (the null passed above is the default-value arg; field has no + // default because it's not a nullable union). hasDefaultValue() is false. + assertFalse(original.hasDefaultValue()); + + Schema.Field clone = AvroSchemas.cloneField("a", original); + + assertFalse(clone.hasDefaultValue(), "no-default fields clone without inventing one"); + } + + // --- mergeKeyIntoValue --- + + @Test + void mergeKeyIntoValueInheritsValueSchemaIdentityAndProps() { + Schema keySchema = SchemaBuilder.record("Key").fields().requiredString("id").endRecord(); + Schema valueSchema = SchemaBuilder.record("User").namespace("com.linkedin.foo") + .aliases("com.linkedin.foo.UserV1").doc("User doc") + .prop("owningTeam", "urn:li:internalTeam:feed") + .fields().requiredString("name").endRecord(); + + Schema merged = AvroSchemas.mergeKeyIntoValue(keySchema, valueSchema, "KEY_", "KEY"); + + assertEquals("User doc", merged.getDoc()); + assertTrue(merged.getAliases().contains("com.linkedin.foo.UserV1")); + assertEquals("urn:li:internalTeam:feed", merged.getObjectProp("owningTeam")); + } + + @Test + void mergeKeyIntoValueThrowsForNonRecordValue() { + assertThrows(IllegalArgumentException.class, + () -> AvroSchemas.mergeKeyIntoValue( + Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.STRING), "KEY_", "KEY")); + } + + @Test + void mergeKeyIntoValueStructKeyGetsPrefix() { + Schema keySchema = SchemaBuilder.record("Key").namespace("com.linkedin.k").fields() + .requiredString("id").requiredInt("partition").endRecord(); + Schema valueSchema = SchemaBuilder.record("User").namespace("com.linkedin.v").fields() + .requiredString("name").endRecord(); + + Schema merged = AvroSchemas.mergeKeyIntoValue(keySchema, valueSchema, "KEY_", "KEY"); + + assertEquals(List.of("KEY_id", "KEY_partition", "name"), + merged.getFields().stream().map(Schema.Field::name).collect(Collectors.toList())); + assertEquals("com.linkedin.v", merged.getNamespace(), "merged inherits value namespace"); + assertEquals("User", merged.getName(), "merged inherits value name"); + } + + @Test + void mergeKeyIntoValuePrimitiveKeyBecomesSingleNamedField() { + Schema keySchema = Schema.create(Schema.Type.STRING); + Schema valueSchema = SchemaBuilder.record("User").namespace("com.linkedin.v").fields() + .requiredString("name").endRecord(); + + Schema merged = AvroSchemas.mergeKeyIntoValue(keySchema, valueSchema, "KEY_", "KEY"); + + assertEquals(2, merged.getFields().size()); + assertEquals("KEY", merged.getFields().get(0).name()); + assertEquals(Schema.Type.STRING, merged.getFields().get(0).schema().getType()); + assertEquals("name", merged.getFields().get(1).name()); + } + + @Test + void mergeKeyIntoValuePreservesNestedRecordIdentities() { + // Nested records in value (or key) keep their source namespaces because the Schema reference + // is shared by cloneField — no round-trip through a type system. + Schema address = SchemaBuilder.record("Address").namespace("com.linkedin.addr").fields() + .requiredString("city").endRecord(); + Schema valueSchema = SchemaBuilder.record("User").namespace("com.linkedin.v").fields() + .name("address").type(address).noDefault().endRecord(); + + Schema merged = AvroSchemas.mergeKeyIntoValue( + Schema.create(Schema.Type.STRING), valueSchema, "KEY_", "KEY"); + + Schema addrField = merged.getField("address").schema(); + assertSame(address, addrField, "nested record schema shared by reference"); + assertEquals("com.linkedin.addr", addrField.getNamespace()); + } + + @Test + void mergeKeyIntoValueReusedRecordsRenderAsNamedReferences() { + // Same Schema instance referenced twice in the value → Avro serializer writes the record + // definition once and references by FQN thereafter. + Schema shared = SchemaBuilder.record("Shared").namespace("com.linkedin.s").fields() + .requiredString("v").endRecord(); + Schema valueSchema = SchemaBuilder.record("User").namespace("com.linkedin.v").fields() + .name("first").type(shared).noDefault() + .name("second").type(shared).noDefault() + .endRecord(); + + Schema merged = AvroSchemas.mergeKeyIntoValue( + Schema.create(Schema.Type.STRING), valueSchema, "KEY_", "KEY"); + String json = merged.toString(false); + + int firstDef = json.indexOf("\"name\":\"Shared\""); + int secondDef = json.indexOf("\"name\":\"Shared\"", firstDef + 1); + assertEquals(-1, secondDef, "reused record serialized once; got " + json); + } + + @Test + void mergeKeyIntoValueKeyFieldsKeepFieldProps() { + // Custom props on key fields (common for LinkedIn schemas — "java", "validate", etc.) should + // survive the key→KEY_ rename. + Schema keySchema = SchemaBuilder.record("Key").fields() + .name("id").type().stringType().noDefault() + .endRecord(); + keySchema.getField("id").addProp("compliance", "NONE"); + Schema valueSchema = SchemaBuilder.record("V").namespace("v").fields() + .requiredString("x").endRecord(); + + Schema merged = AvroSchemas.mergeKeyIntoValue(keySchema, valueSchema, "KEY_", "KEY"); + + assertEquals("NONE", merged.getField("KEY_id").getObjectProp("compliance")); + } + + @Test + void mergeKeyIntoValueSchemaRoundTripsThroughAvroParser() { + // End-to-end sanity: the merged schema is parseable — ensures we don't produce anything that + // would trip Avro's validation (e.g. duplicate names, unresolvable references). + Schema keySchema = SchemaBuilder.record("Key").namespace("com.linkedin.k").fields() + .requiredString("id").endRecord(); + Schema valueSchema = SchemaBuilder.record("User").namespace("com.linkedin.v").fields() + .requiredString("name").endRecord(); + Schema merged = AvroSchemas.mergeKeyIntoValue(keySchema, valueSchema, "KEY_", "KEY"); + + Schema reparsed = new Schema.Parser().parse(merged.toString(true)); + assertNotNull(reparsed); + assertEquals("com.linkedin.v.User", reparsed.getFullName()); + assertEquals(2, reparsed.getFields().size()); + } +} diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorConnection.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorConnection.java index e42008a9..597ea79d 100644 --- a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorConnection.java +++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorConnection.java @@ -4,6 +4,8 @@ import com.linkedin.hoptimator.Sink; import com.linkedin.hoptimator.Source; import com.linkedin.hoptimator.avro.AvroConverter; +import com.linkedin.hoptimator.avro.AvroSchemaProvider; +import com.linkedin.hoptimator.avro.AvroSchemas; import com.linkedin.hoptimator.util.ConnectionService; import com.linkedin.hoptimator.util.DelegatingConnection; import org.apache.avro.Schema; @@ -12,6 +14,8 @@ import org.apache.calcite.jdbc.CalciteSchema; import org.apache.calcite.plan.RelOptMaterialization; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Table; import org.apache.calcite.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,17 +51,20 @@ public HoptimatorConnection(CalciteConnection connection, Properties connectionP public ResolvedTable resolve(List tablePath, Map hints) throws SQLException { try { - String tableSql = "SELECT * FROM " + tablePath.stream() - .map(x -> "\"" + x + "\"") - .collect(Collectors.joining(".")); - RelNode tableRel = HoptimatorDriver.convert(this, tableSql).root.rel; - String namespace = "com.linkedin." + Schema avroSchema = providerSchema(tablePath); + if (avroSchema == null) { + String tableSql = "SELECT * FROM " + tablePath.stream() + .map(x -> "\"" + x + "\"") + .collect(Collectors.joining(".")); + RelNode tableRel = HoptimatorDriver.convert(this, tableSql).root.rel; + String namespace = "com.linkedin." + tablePath.stream() .map(x -> x.toLowerCase(Locale.ROOT)) .limit(tablePath.size() - 1) .collect(Collectors.joining(".")); - String schemaName = tablePath.get(tablePath.size() - 1).toLowerCase(Locale.ROOT); - Schema avroSchema = AvroConverter.avro(namespace, schemaName, tableRel.getRowType()); + String schemaName = tablePath.get(tablePath.size() - 1).toLowerCase(Locale.ROOT); + avroSchema = AvroConverter.avro(namespace, schemaName, tableRel.getRowType()); + } String database = databaseName(this.createPrepareContext(), tablePath); Source source = new Source(database, tablePath, hints); Sink sink = new Sink(database, tablePath, hints); @@ -68,6 +75,37 @@ public ResolvedTable resolve(List tablePath, Map hints) } } + /** + * Returns the upstream provider's merged Avro schema (value + prefixed key fields where + * applicable) when the resolved table at {@code tablePath} implements + * {@link AvroSchemaProvider}, or {@code null} when no provider is available and the caller + * should fall back to RelDataType synthesis. + * + *

{@code resolve()} merges key+value because its consumers (the SQL/query layer, the Flink + * catalog, the {@code !resolve} CLI command) expect keys to appear as columns in the table's + * logical schema. Connector deployers, which render connector payload options, call + * {@link AvroSchemaProvider#valueSchema()} directly instead. + */ + private Schema providerSchema(List tablePath) { + return providerSchemaAt(connection.getRootSchema(), tablePath); + } + + static Schema providerSchemaAt(SchemaPlus root, List tablePath) { + SchemaPlus schema = root; + for (String part : Util.skipLast(tablePath)) { + if (schema == null) { + return null; + } + schema = schema.subSchemas().get(part); + } + if (schema == null) { + return null; + } + Table table = schema.tables().get(tablePath.get(tablePath.size() - 1)); + return table instanceof AvroSchemaProvider + ? AvroSchemas.mergedAvroSchemaFor((AvroSchemaProvider) table) : null; + } + @Override public Statement createStatement() throws SQLException { return connection.createStatement(); diff --git a/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/HoptimatorConnectionTest.java b/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/HoptimatorConnectionTest.java index fbc17260..b0a041e3 100644 --- a/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/HoptimatorConnectionTest.java +++ b/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/HoptimatorConnectionTest.java @@ -1,11 +1,19 @@ package com.linkedin.hoptimator.jdbc; import com.linkedin.hoptimator.Source; +import com.linkedin.hoptimator.avro.AvroSchemaProvider; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; import org.apache.calcite.jdbc.CalciteConnection; import org.apache.calcite.jdbc.CalcitePrepare; +import org.apache.calcite.jdbc.CalciteSchema; import org.apache.calcite.plan.RelOptMaterialization; import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.impl.AbstractSchema; +import org.apache.calcite.schema.impl.AbstractTable; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -26,6 +34,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -208,6 +217,107 @@ void resolveReturnsNonNullTypeForExistingTwoPartPath() throws SQLException { } } + @Test + void providerSchemaAtReturnsNullForUnknownPath() { + SchemaPlus root = CalciteSchema.createRootSchema(false).plus(); + assertNull(HoptimatorConnection.providerSchemaAt(root, + List.of("missingDb", "missingTable"))); + } + + @Test + void providerSchemaAtReturnsNullWhenTableIsNotProvider() { + SchemaPlus root = CalciteSchema.createRootSchema(false).plus(); + SchemaPlus db = root.add("db", new AbstractSchema()); + db.add("plain", new AbstractTable() { + @Override + public RelDataType getRowType(RelDataTypeFactory factory) { + throw new UnsupportedOperationException(); + } + }); + + assertNull(HoptimatorConnection.providerSchemaAt(root, List.of("db", "plain"))); + } + + @Test + void providerSchemaAtReturnsValueSchemaWhenProviderHasNoKey() { + // No key → the merge helper returns the value schema unchanged. + Schema value = SchemaBuilder.record("User").namespace("com.linkedin.foo").fields() + .requiredString("name").endRecord(); + SchemaPlus root = CalciteSchema.createRootSchema(false).plus(); + SchemaPlus db = root.add("db", new AbstractSchema()); + db.add("user", new ProviderTable(value, null)); + + Schema result = HoptimatorConnection.providerSchemaAt(root, List.of("db", "user")); + + assertSame(value, result, "no key → merged is just the value schema"); + } + + @Test + void providerSchemaAtMergesKeyAndValueWhenProviderExposesBoth() { + // Both key and value → merged view with KEY_-prefixed key fields prepended before value. + // resolve() uses this so SQL queries can reference key columns by name. + Schema value = SchemaBuilder.record("User").namespace("com.linkedin.foo").fields() + .requiredString("name").endRecord(); + Schema key = SchemaBuilder.record("UserKey").namespace("com.linkedin.keyns").fields() + .requiredString("id").endRecord(); + SchemaPlus root = CalciteSchema.createRootSchema(false).plus(); + SchemaPlus db = root.add("db", new AbstractSchema()); + db.add("user", new ProviderTable(value, key)); + + Schema result = HoptimatorConnection.providerSchemaAt(root, List.of("db", "user")); + + assertNotNull(result); + assertEquals(2, result.getFields().size()); + assertEquals("KEY_id", result.getFields().get(0).name()); + assertEquals("name", result.getFields().get(1).name()); + // Merged record inherits value's namespace/name. + assertEquals("com.linkedin.foo", result.getNamespace()); + assertEquals("User", result.getName()); + } + + @Test + void providerSchemaAtMergesPrimitiveKeyAsSingleKeyField() { + Schema value = SchemaBuilder.record("User").namespace("com.linkedin.foo").fields() + .requiredString("name").endRecord(); + Schema key = Schema.create(Schema.Type.STRING); + SchemaPlus root = CalciteSchema.createRootSchema(false).plus(); + SchemaPlus db = root.add("db", new AbstractSchema()); + db.add("user", new ProviderTable(value, key)); + + Schema result = HoptimatorConnection.providerSchemaAt(root, List.of("db", "user")); + + assertNotNull(result); + assertEquals(2, result.getFields().size()); + assertEquals("KEY", result.getFields().get(0).name()); + assertEquals(Schema.Type.STRING, result.getFields().get(0).schema().getType()); + assertEquals("name", result.getFields().get(1).name()); + } + + private static final class ProviderTable extends AbstractTable implements AvroSchemaProvider { + private final Schema value; + private final Schema key; + + ProviderTable(Schema value, Schema key) { + this.value = value; + this.key = key; + } + + @Override + public Schema valueSchema() { + return value; + } + + @Override + public Schema keySchema() { + return key; + } + + @Override + public RelDataType getRowType(RelDataTypeFactory factory) { + throw new UnsupportedOperationException(); + } + } + @Test void testMultipleLogHooksAllInvoked() { List hook1Messages = new ArrayList<>(); diff --git a/hoptimator-k8s/build.gradle b/hoptimator-k8s/build.gradle index 9a753b73..d219bea5 100644 --- a/hoptimator-k8s/build.gradle +++ b/hoptimator-k8s/build.gradle @@ -6,8 +6,10 @@ plugins { dependencies { implementation project(':hoptimator-api') + implementation project(':hoptimator-avro') implementation project(':hoptimator-jdbc') implementation project(':hoptimator-util') + implementation libs.avro implementation libs.calcite.server implementation libs.kubernetes.client diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java index f273fa23..6ca2e304 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java @@ -4,13 +4,20 @@ import com.linkedin.hoptimator.Connector; import com.linkedin.hoptimator.Sink; import com.linkedin.hoptimator.Source; +import com.linkedin.hoptimator.avro.AvroConverter; +import com.linkedin.hoptimator.avro.AvroSchemaProvider; +import com.linkedin.hoptimator.avro.AvroSchemas; import com.linkedin.hoptimator.jdbc.HoptimatorDriver; import com.linkedin.hoptimator.k8s.models.V1alpha1TableTemplate; import com.linkedin.hoptimator.k8s.models.V1alpha1TableTemplateList; import com.linkedin.hoptimator.k8s.models.V1alpha1TableTemplateSpec; import com.linkedin.hoptimator.util.Template; +import org.apache.avro.Schema; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Table; +import org.apache.calcite.util.Util; import java.io.IOException; import java.io.StringReader; @@ -35,7 +42,6 @@ class K8sConnector implements Connector { private static final String KEY_OPTION = "keys"; private static final String KEY_PREFIX_OPTION = "keyPrefix"; private static final String KEY_TYPE_OPTION = "keyType"; - private static final String KEY_PREFIX = "KEY_"; K8sConnector(Source source, K8sContext context) { this.source = source; @@ -59,6 +65,7 @@ public Map configure() throws SQLException { .with("catalog", source.catalog()) .with("schema", source.schema()) .with("table", source.table()) + .with("avroValueSchema", () -> avroValueSchema(source, sourceRowType).toString()) .with(options); List templates = tableTemplateApi.list() .stream() @@ -107,6 +114,35 @@ private Map getConnectorHints(Map options, Strin .collect(Collectors.toMap(e -> e.getKey().substring(connectorHintPrefix.length() + 1), Map.Entry::getValue)); } + /** + * Renders the value Avro schema for the {@code {{avroValueSchema}}} template variable. Prefers + * the upstream table's native value schema when it implements {@link AvroSchemaProvider} — keys + * aren't included, because the connector handles them separately via {@code key.fields}. Falls + * back to synthesizing from the flat row type, which loses source-level namespaces and nested + * record identities. + */ + private Schema avroValueSchema(Source source, RelDataType sourceRowType) { + Table table = lookupTable(source); + if (table instanceof AvroSchemaProvider) { + Schema provided = ((AvroSchemaProvider) table).valueSchema(); + if (provided != null) { + return provided; + } + } + return AvroConverter.avro("com.linkedin.hoptimator", source.table(), sourceRowType); + } + + private Table lookupTable(Source source) { + SchemaPlus schema = context.connection().calciteConnection().getRootSchema(); + for (String part : Util.skipLast(source.path())) { + if (schema == null) { + return null; + } + schema = schema.subSchemas().get(part); + } + return schema == null ? null : schema.tables().get(source.table()); + } + @VisibleForTesting static Map addKeysAsOption(Map options, RelDataType rowType) { Map newOptions = new LinkedHashMap<>(options); @@ -118,11 +154,11 @@ static Map addKeysAsOption(Map options, RelDataT String keyString = rowType.getFieldList().stream() .map(RelDataTypeField::getName) - .filter(name -> name.startsWith(KEY_PREFIX)) + .filter(name -> name.startsWith(AvroSchemas.KEY_PREFIX)) .collect(Collectors.joining(";")); if (!keyString.isEmpty()) { newOptions.put(KEY_OPTION, keyString.replaceAll("\\s+", "")); - newOptions.put(KEY_PREFIX_OPTION, KEY_PREFIX); + newOptions.put(KEY_PREFIX_OPTION, AvroSchemas.KEY_PREFIX); newOptions.put(KEY_TYPE_OPTION, "RECORD"); } return newOptions; diff --git a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sConnectorTest.java b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sConnectorTest.java index 234aacc6..fd8bc177 100644 --- a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sConnectorTest.java +++ b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sConnectorTest.java @@ -2,15 +2,23 @@ import com.linkedin.hoptimator.Sink; import com.linkedin.hoptimator.Source; +import com.linkedin.hoptimator.avro.AvroSchemaProvider; import com.linkedin.hoptimator.jdbc.HoptimatorConnection; import com.linkedin.hoptimator.jdbc.HoptimatorDriver; import com.linkedin.hoptimator.k8s.models.V1alpha1TableTemplate; import com.linkedin.hoptimator.k8s.models.V1alpha1TableTemplateList; import com.linkedin.hoptimator.k8s.models.V1alpha1TableTemplateSpec; import io.kubernetes.client.openapi.models.V1ObjectMeta; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.calcite.jdbc.CalciteConnection; +import org.apache.calcite.jdbc.CalciteSchema; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.impl.AbstractTable; import org.apache.calcite.sql.type.SqlTypeFactoryImpl; import org.apache.calcite.sql.type.SqlTypeName; import org.junit.jupiter.api.Test; @@ -34,6 +42,8 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -345,4 +355,105 @@ K8sApi createTableTemplateApi( assertEquals("kafka", config.get("connector")); } + + @Test + void configureAvroValueSchemaUsesProviderWhenAvailable() throws SQLException { + // When the resolved table implements AvroSchemaProvider, its native Avro schema is rendered + // into the template as-is — no round-trip through RelDataType. + RelDataType rowType = new RelDataTypeFactory.Builder(typeFactory) + .add("KEY_id", SqlTypeName.VARCHAR) + .add("name", SqlTypeName.VARCHAR).build(); + hoptimatorDriverMock.when(() -> HoptimatorDriver.rowType(any(Source.class), any())) + .thenReturn(rowType); + + Schema nativeSchema = SchemaBuilder.record("User").namespace("com.linkedin.foo").fields() + .requiredString("KEY_id") + .requiredString("name") + .endRecord(); + Source source = new Source("testdb", Arrays.asList("schema", "table"), + Collections.emptyMap()); + installRootSchemaWithTable(source, new ProviderTable(nativeSchema)); + + FakeK8sApi templateApi = new FakeK8sApi<>( + List.of(new V1alpha1TableTemplate() + .metadata(new V1ObjectMeta().name("tpl")) + .spec(new V1alpha1TableTemplateSpec() + .databases(List.of("testdb")) + .connector("avroValueSchema={{avroValueSchema}}")))); + + K8sConnector connector = makeConnector(source, templateApi); + Map config = connector.configure(); + + String rendered = config.get("avroValueSchema"); + assertNotNull(rendered); + assertTrue(rendered.contains("\"namespace\":\"com.linkedin.foo\""), + "provider namespace preserved; got " + rendered); + assertTrue(rendered.contains("\"name\":\"User\""), + "provider record name preserved; got " + rendered); + } + + @Test + void configureAvroValueSchemaFallsBackToRowTypeSynthesisWhenNoProvider() throws SQLException { + // No provider on the resolved table → synthesize from the row type using AvroConverter.avro. + RelDataType rowType = new RelDataTypeFactory.Builder(typeFactory) + .add("name", SqlTypeName.VARCHAR).build(); + hoptimatorDriverMock.when(() -> HoptimatorDriver.rowType(any(Source.class), any())) + .thenReturn(rowType); + + Source source = new Source("testdb", Arrays.asList("schema", "table"), + Collections.emptyMap()); + installRootSchemaWithTable(source, new PlainTable()); + + FakeK8sApi templateApi = new FakeK8sApi<>( + List.of(new V1alpha1TableTemplate() + .metadata(new V1ObjectMeta().name("tpl")) + .spec(new V1alpha1TableTemplateSpec() + .databases(List.of("testdb")) + .connector("avroValueSchema={{avroValueSchema}}")))); + + K8sConnector connector = makeConnector(source, templateApi); + Map config = connector.configure(); + + assertTrue(config.get("avroValueSchema").contains("\"namespace\":\"com.linkedin.hoptimator.table\""), + "synthesized fallback produces the legacy namespace pattern; got " + config.get("avroValueSchema")); + } + + private void installRootSchemaWithTable(Source source, Table table) { + SchemaPlus root = CalciteSchema.createRootSchema(false).plus(); + SchemaPlus parent = root; + for (String part : source.path().subList(0, source.path().size() - 1)) { + parent = parent.add(part, new org.apache.calcite.schema.impl.AbstractSchema()); + } + parent.add(source.table(), table); + + CalciteConnection calciteConn = mock(CalciteConnection.class); + when(calciteConn.getRootSchema()).thenReturn(root); + when(connection.calciteConnection()).thenReturn(calciteConn); + when(mockContext.connection()).thenReturn(connection); + } + + private static final class PlainTable extends AbstractTable { + @Override + public RelDataType getRowType(RelDataTypeFactory factory) { + throw new UnsupportedOperationException(); + } + } + + private static final class ProviderTable extends AbstractTable implements AvroSchemaProvider { + private final Schema value; + + ProviderTable(Schema value) { + this.value = value; + } + + @Override + public Schema valueSchema() { + return value; + } + + @Override + public RelDataType getRowType(RelDataTypeFactory factory) { + throw new UnsupportedOperationException(); + } + } } diff --git a/hoptimator-kafka/build.gradle b/hoptimator-kafka/build.gradle index 3544972f..7cd1c11a 100644 --- a/hoptimator-kafka/build.gradle +++ b/hoptimator-kafka/build.gradle @@ -5,6 +5,7 @@ plugins { dependencies { implementation project(':hoptimator-api') + implementation project(':hoptimator-avro') implementation project(':hoptimator-jdbc') implementation project(':hoptimator-util') implementation libs.calcite.core diff --git a/hoptimator-kafka/src/main/java/com/linkedin/hoptimator/kafka/KafkaTopic.java b/hoptimator-kafka/src/main/java/com/linkedin/hoptimator/kafka/KafkaTopic.java index 8b5f5571..b85e1162 100644 --- a/hoptimator-kafka/src/main/java/com/linkedin/hoptimator/kafka/KafkaTopic.java +++ b/hoptimator-kafka/src/main/java/com/linkedin/hoptimator/kafka/KafkaTopic.java @@ -1,5 +1,6 @@ package com.linkedin.hoptimator.kafka; +import com.linkedin.hoptimator.avro.AvroSchemas; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.schema.impl.AbstractTable; @@ -22,7 +23,7 @@ public KafkaTopic(String topicName, Properties properties) { @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) { RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory); - return builder.add("KEY", SqlTypeName.VARCHAR) + return builder.add(AvroSchemas.PRIMITIVE_KEY_NAME, SqlTypeName.VARCHAR) .nullable(true) .add("VALUE", SqlTypeName.BINARY) .nullable(true) diff --git a/hoptimator-mysql/build.gradle b/hoptimator-mysql/build.gradle index a571c611..fa3cf35b 100644 --- a/hoptimator-mysql/build.gradle +++ b/hoptimator-mysql/build.gradle @@ -4,6 +4,7 @@ plugins { dependencies { implementation project(':hoptimator-api') + implementation project(':hoptimator-avro') implementation project(':hoptimator-jdbc') implementation project(':hoptimator-util') implementation libs.calcite.core diff --git a/hoptimator-mysql/src/main/java/com/linkedin/hoptimator/mysql/MySqlDeployer.java b/hoptimator-mysql/src/main/java/com/linkedin/hoptimator/mysql/MySqlDeployer.java index d7aa4cbe..c2d996d4 100644 --- a/hoptimator-mysql/src/main/java/com/linkedin/hoptimator/mysql/MySqlDeployer.java +++ b/hoptimator-mysql/src/main/java/com/linkedin/hoptimator/mysql/MySqlDeployer.java @@ -4,6 +4,7 @@ import com.linkedin.hoptimator.Source; import com.linkedin.hoptimator.Validated; import com.linkedin.hoptimator.Validator; +import com.linkedin.hoptimator.avro.AvroSchemas; import com.linkedin.hoptimator.jdbc.HoptimatorConnection; import com.linkedin.hoptimator.jdbc.HoptimatorDriver; import org.apache.calcite.rel.type.RelDataType; @@ -37,8 +38,6 @@ public class MySqlDeployer implements Deployer, Validated { private static final Logger log = LoggerFactory.getLogger(MySqlDeployer.class); - private static final String KEY_PREFIX = "KEY_"; - private final Source source; private final Properties properties; private final HoptimatorConnection hoptimatorConnection; @@ -57,8 +56,8 @@ public MySqlDeployer(Source source, Properties properties, HoptimatorConnection private List parseKeyFields(RelDataType rowType) { List keyFields = new ArrayList<>(); for (RelDataTypeField field : rowType.getFieldList()) { - if (field.getName().startsWith(KEY_PREFIX)) { - String keyName = field.getName().substring(KEY_PREFIX.length()); + if (field.getName().startsWith(AvroSchemas.KEY_PREFIX)) { + String keyName = field.getName().substring(AvroSchemas.KEY_PREFIX.length()); keyFields.add(keyName); } } @@ -133,8 +132,8 @@ public void validate(Validator.Issues issues) { // Validate all column names for (RelDataTypeField field : newRowType.getFieldList()) { String fieldName = field.getName(); - String columnName = fieldName.startsWith(KEY_PREFIX) - ? fieldName.substring(KEY_PREFIX.length()) + String columnName = fieldName.startsWith(AvroSchemas.KEY_PREFIX) + ? fieldName.substring(AvroSchemas.KEY_PREFIX.length()) : fieldName; if (!isValidIdentifier(columnName)) { issues.error("Invalid column name: " + columnName); @@ -166,8 +165,8 @@ public void validate(Validator.Issues issues) { // Find the new type for this key field for (RelDataTypeField field : newRowType.getFieldList()) { String fieldName = field.getName(); - if (fieldName.startsWith(KEY_PREFIX)) { - String columnName = fieldName.substring(KEY_PREFIX.length()); + if (fieldName.startsWith(AvroSchemas.KEY_PREFIX)) { + String columnName = fieldName.substring(AvroSchemas.KEY_PREFIX.length()); if (columnName.equals(keyField)) { String newType = toMySqlType(field); String existingType = existingCol.type; @@ -508,8 +507,8 @@ private Map buildDesiredColumns(RelDataType rowType) throws String columnName; boolean isKey; - if (fieldName.startsWith(KEY_PREFIX)) { - columnName = fieldName.substring(KEY_PREFIX.length()); + if (fieldName.startsWith(AvroSchemas.KEY_PREFIX)) { + columnName = fieldName.substring(AvroSchemas.KEY_PREFIX.length()); isKey = true; } else { columnName = fieldName; @@ -558,9 +557,9 @@ private String buildCreateTableSql(String database, String tableName) throws SQL String fieldName = field.getName(); String columnName; - if (fieldName.startsWith(KEY_PREFIX)) { + if (fieldName.startsWith(AvroSchemas.KEY_PREFIX)) { // This is a key field - strip prefix and add to columns - columnName = fieldName.substring(KEY_PREFIX.length()); + columnName = fieldName.substring(AvroSchemas.KEY_PREFIX.length()); } else { // Regular column columnName = fieldName; diff --git a/hoptimator-util/build.gradle b/hoptimator-util/build.gradle index 9bab1483..ad8b0c9d 100644 --- a/hoptimator-util/build.gradle +++ b/hoptimator-util/build.gradle @@ -5,6 +5,8 @@ plugins { dependencies { implementation project(':hoptimator-api') + implementation project(':hoptimator-avro') + implementation libs.avro implementation libs.calcite.core implementation libs.yaml compileOnly libs.findbugs diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcTable.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcTable.java index dabb7397..4ff5b502 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcTable.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcTable.java @@ -1,8 +1,12 @@ package com.linkedin.hoptimator.util.planner; +import com.google.common.annotations.VisibleForTesting; +import com.linkedin.hoptimator.avro.AvroSchemaProvider; import com.linkedin.hoptimator.util.DataTypeUtils; +import org.apache.avro.Schema; import org.apache.calcite.adapter.java.AbstractQueryableTable; import org.apache.calcite.adapter.jdbc.JdbcTable; +import org.apache.calcite.jdbc.CalciteConnection; import org.apache.calcite.linq4j.QueryProvider; import org.apache.calcite.linq4j.Queryable; import org.apache.calcite.plan.RelOptCluster; @@ -15,14 +19,18 @@ import org.apache.calcite.rex.RexNode; import org.apache.calcite.schema.ModifiableTable; import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Table; import org.apache.calcite.schema.TranslatableTable; +import javax.annotation.Nullable; +import java.sql.Connection; +import java.sql.SQLException; import java.util.Collection; import java.util.List; public class HoptimatorJdbcTable extends AbstractQueryableTable implements TranslatableTable, - ModifiableTable { + ModifiableTable, AvroSchemaProvider { private final JdbcTable jdbcTable; private final HoptimatorJdbcConvention convention; @@ -68,4 +76,55 @@ public Queryable asQueryable(QueryProvider queryProvider, SchemaPlus schema, String tableName) { return jdbcTable.asQueryable(queryProvider, schema, tableName); } + + /** + * Returns the upstream's native Avro value schema when the underlying data source exposes one + * via {@link AvroSchemaProvider}. Returns {@code null} when the upstream is not a Calcite + * connection, when the upstream table cannot be located, or when it does not implement the + * interface. + * + *

Opens a new upstream JDBC connection on each call; callers should cache if invoked + * repeatedly. + */ + @Override + public @Nullable Schema valueSchema() { + Table upstream = upstreamTable(); + return upstream instanceof AvroSchemaProvider ? ((AvroSchemaProvider) upstream).valueSchema() : null; + } + + /** + * Returns the upstream's native Avro key schema when present, or {@code null} when the upstream + * table has no distinct key concept or does not implement {@link AvroSchemaProvider}. + */ + @Override + public @Nullable Schema keySchema() { + Table upstream = upstreamTable(); + return upstream instanceof AvroSchemaProvider ? ((AvroSchemaProvider) upstream).keySchema() : null; + } + + @VisibleForTesting + @Nullable Table upstreamTable() { + try (Connection upstream = jdbcTable.jdbcSchema.getDataSource().getConnection()) { + CalciteConnection calciteUpstream; + try { + calciteUpstream = upstream.unwrap(CalciteConnection.class); + } catch (SQLException e) { + return null; + } + SchemaPlus schema = calciteUpstream.getRootSchema(); + if (schema == null) { + return null; + } + if (jdbcTable.jdbcSchemaName != null) { + schema = schema.subSchemas().get(jdbcTable.jdbcSchemaName); + if (schema == null) { + return null; + } + } + return schema.tables().get(jdbcTable.jdbcTableName); + } catch (SQLException e) { + throw new RuntimeException( + "Failed to open upstream connection while reading Avro schema for " + jdbcTable.jdbcTableName, e); + } + } } diff --git a/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcTableTest.java b/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcTableTest.java index 088cfc45..9ee99602 100644 --- a/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcTableTest.java +++ b/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcTableTest.java @@ -1,5 +1,8 @@ package com.linkedin.hoptimator.util.planner; +import com.linkedin.hoptimator.avro.AvroSchemaProvider; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; import org.apache.calcite.adapter.jdbc.JdbcTable; import org.apache.calcite.linq4j.QueryProvider; import org.apache.calcite.linq4j.Queryable; @@ -13,6 +16,8 @@ import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelDataTypeSystem; import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.impl.AbstractTable; import org.apache.calcite.sql.dialect.AnsiSqlDialect; import org.apache.calcite.sql.type.SqlTypeFactoryImpl; import org.apache.calcite.sql.type.SqlTypeName; @@ -97,6 +102,101 @@ void testToModificationRelDelegatesToJdbcTable() { mockInput, TableModify.Operation.INSERT, null, null, false); } + @Test + void valueSchemaReturnsNullWhenNoUpstreamTable() { + HoptimatorJdbcTable tableWithNullUpstream = new HoptimatorJdbcTable(mockJdbcTable, + new HoptimatorJdbcConvention(AnsiSqlDialect.DEFAULT, mockExpression, "db", + Collections.emptyList(), mockConnection)) { + @Override + Table upstreamTable() { + return null; + } + }; + assertNull(tableWithNullUpstream.valueSchema()); + assertNull(tableWithNullUpstream.keySchema()); + } + + @Test + void valueSchemaReturnsNullWhenUpstreamDoesNotImplementProvider() { + Table upstream = new AbstractTable() { + @Override + public RelDataType getRowType(RelDataTypeFactory factory) { + throw new UnsupportedOperationException(); + } + }; + HoptimatorJdbcTable wrapper = new HoptimatorJdbcTable(mockJdbcTable, + new HoptimatorJdbcConvention(AnsiSqlDialect.DEFAULT, mockExpression, "db", + Collections.emptyList(), mockConnection)) { + @Override + Table upstreamTable() { + return upstream; + } + }; + assertNull(wrapper.valueSchema()); + assertNull(wrapper.keySchema()); + } + + @Test + void valueAndKeySchemaDelegateToUpstreamProvider() { + Schema expectedValue = SchemaBuilder.record("Foo").namespace("com.linkedin.bar").fields() + .requiredString("v").endRecord(); + Schema expectedKey = SchemaBuilder.record("FooKey").namespace("com.linkedin.keyns").fields() + .requiredString("id").endRecord(); + Table upstream = new ProviderTable(expectedValue, expectedKey); + HoptimatorJdbcTable wrapper = new HoptimatorJdbcTable(mockJdbcTable, + new HoptimatorJdbcConvention(AnsiSqlDialect.DEFAULT, mockExpression, "db", + Collections.emptyList(), mockConnection)) { + @Override + Table upstreamTable() { + return upstream; + } + }; + assertSame(expectedValue, wrapper.valueSchema()); + assertSame(expectedKey, wrapper.keySchema()); + } + + @Test + void keySchemaReturnsNullWhenUpstreamHasNoKey() { + Schema valueOnly = SchemaBuilder.record("Foo").namespace("ns").fields() + .requiredString("v").endRecord(); + Table upstream = new ProviderTable(valueOnly, null); + HoptimatorJdbcTable wrapper = new HoptimatorJdbcTable(mockJdbcTable, + new HoptimatorJdbcConvention(AnsiSqlDialect.DEFAULT, mockExpression, "db", + Collections.emptyList(), mockConnection)) { + @Override + Table upstreamTable() { + return upstream; + } + }; + assertSame(valueOnly, wrapper.valueSchema()); + assertNull(wrapper.keySchema(), "upstream with no key should propagate null"); + } + + private static final class ProviderTable extends AbstractTable implements AvroSchemaProvider { + private final Schema value; + private final Schema key; + + ProviderTable(Schema value, Schema key) { + this.value = value; + this.key = key; + } + + @Override + public Schema valueSchema() { + return value; + } + + @Override + public Schema keySchema() { + return key; + } + + @Override + public RelDataType getRowType(RelDataTypeFactory factory) { + throw new UnsupportedOperationException(); + } + } + @Test @SuppressWarnings("unchecked") void testAsQueryableDelegatesToJdbcTable() { diff --git a/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceStore.java b/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceStore.java index 2c55c9a5..6bee4613 100644 --- a/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceStore.java +++ b/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceStore.java @@ -1,6 +1,8 @@ package com.linkedin.hoptimator.venice; import com.linkedin.hoptimator.avro.AvroConverter; +import com.linkedin.hoptimator.avro.AvroSchemaProvider; +import com.linkedin.hoptimator.avro.AvroSchemas; import com.linkedin.hoptimator.util.DataTypeUtils; import com.linkedin.venice.client.schema.StoreSchemaFetcher; import org.apache.avro.Schema; @@ -11,9 +13,7 @@ /** A batch of records from a Venice store. */ -public class VeniceStore extends AbstractTable { - - private static final String KEY_PREFIX = "KEY_"; +public class VeniceStore extends AbstractTable implements AvroSchemaProvider { private final StoreSchemaFetcher storeSchemaFetcher; private final Integer valueSchemaId; @@ -26,12 +26,7 @@ public VeniceStore(StoreSchemaFetcher storeSchemaFetcher, VeniceStoreConfig stor @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) { Schema keySchema = storeSchemaFetcher.getKeySchema(); - Schema valueSchema; - if (valueSchemaId != null) { - valueSchema = storeSchemaFetcher.getValueSchema(valueSchemaId); - } else { - valueSchema = storeSchemaFetcher.getLatestValueSchema(); - } + Schema valueSchema = fetchValueSchema(); // Venice contains both a key schema and a value schema. Since we need to pass back one joint schema, // and to avoid name collisions, all key fields are flattened as "KEY_foo". @@ -41,17 +36,34 @@ public RelDataType getRowType(RelDataTypeFactory typeFactory) { RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory); if (key.isStruct()) { for (RelDataTypeField field: key.getFieldList()) { - builder.add(KEY_PREFIX + field.getName(), field.getType()); + builder.add(AvroSchemas.KEY_PREFIX + field.getName(), field.getType()); } } else { - builder.add("KEY", key); + builder.add(AvroSchemas.PRIMITIVE_KEY_NAME, key); } builder.addAll(value.getFieldList()); RelDataType combinedSchema = builder.build(); return DataTypeUtils.flatten(combinedSchema, typeFactory); } + @Override + public Schema valueSchema() { + return fetchValueSchema(); + } + + @Override + public Schema keySchema() { + return storeSchemaFetcher.getKeySchema(); + } + protected RelDataType rel(Schema schema, RelDataTypeFactory typeFactory) { return AvroConverter.rel(schema, typeFactory); } + + private Schema fetchValueSchema() { + if (valueSchemaId != null) { + return storeSchemaFetcher.getValueSchema(valueSchemaId); + } + return storeSchemaFetcher.getLatestValueSchema(); + } } diff --git a/hoptimator-venice/src/test/java/com/linkedin/hoptimator/venice/VeniceStoreTest.java b/hoptimator-venice/src/test/java/com/linkedin/hoptimator/venice/VeniceStoreTest.java index 87362ea5..acd29ec2 100644 --- a/hoptimator-venice/src/test/java/com/linkedin/hoptimator/venice/VeniceStoreTest.java +++ b/hoptimator-venice/src/test/java/com/linkedin/hoptimator/venice/VeniceStoreTest.java @@ -17,6 +17,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -132,4 +133,87 @@ void testGetRowTypeFieldCount() { // 2 key fields (KEY_k1, KEY_k2) + 1 value field (v1) assertEquals(3, rowType.getFieldCount()); } + + // --- valueSchema / keySchema wiring tests. Merging logic is covered by AvroSchemasTest. --- + + @Test + void valueSchemaReturnsLatestValueSchemaAsIs() { + // valueSchema() returns the raw payload from the fetcher — no cloning, no KEY_ fields. This + // is what connector payload options (e.g. Flink's default.mode.payload) render. + Schema valueSchema = SchemaBuilder.record("User").namespace("com.linkedin.foo").fields() + .requiredString("name").requiredInt("age").endRecord(); + + when(mockSchemaFetcher.getLatestValueSchema()).thenReturn(valueSchema); + + VeniceStore store = new VeniceStore(mockSchemaFetcher, + new VeniceStoreConfig(Collections.emptyMap())); + + assertSame(valueSchema, store.valueSchema(), "valueSchema() returns the raw value schema"); + } + + @Test + void keySchemaReturnsKeySchemaAsIs() { + Schema keySchema = SchemaBuilder.record("Key").namespace("com.linkedin.keyns").fields() + .requiredString("id").endRecord(); + + when(mockSchemaFetcher.getKeySchema()).thenReturn(keySchema); + + VeniceStore store = new VeniceStore(mockSchemaFetcher, + new VeniceStoreConfig(Collections.emptyMap())); + + assertSame(keySchema, store.keySchema(), "keySchema() returns the raw key schema"); + } + + @Test + void keySchemaReturnsPrimitiveKeyAsPrimitive() { + // Primitive keys come back as primitive Schemas (not wrapped in a synthetic record) so the + // merge helper can apply its primitive-key logic (single "KEY" field). + Schema keySchema = Schema.create(Schema.Type.STRING); + + when(mockSchemaFetcher.getKeySchema()).thenReturn(keySchema); + + VeniceStore store = new VeniceStore(mockSchemaFetcher, + new VeniceStoreConfig(Collections.emptyMap())); + + Schema result = store.keySchema(); + assertSame(keySchema, result); + assertEquals(Schema.Type.STRING, result.getType()); + } + + @Test + void valueSchemaUsesConfiguredValueSchemaId() { + Schema valueSchema = SchemaBuilder.record("User").namespace("com.linkedin.foo").fields() + .requiredString("name").endRecord(); + + when(mockSchemaFetcher.getValueSchema(7)).thenReturn(valueSchema); + + VeniceStoreConfig config = new VeniceStoreConfig(Map.of(VeniceStoreConfig.KEY_VALUE_SCHEMA_ID, "7")); + VeniceStore store = new VeniceStore(mockSchemaFetcher, config); + + assertSame(valueSchema, store.valueSchema()); + verify(mockSchemaFetcher).getValueSchema(7); + } + + @Test + void mergedAvroSchemaHelperCombinesKeyAndValue() { + // End-to-end: AvroSchemas.mergedAvroSchemaFor on a VeniceStore yields the KEY_-prefixed view. + Schema keySchema = SchemaBuilder.record("Key").namespace("com.linkedin.keyns").fields() + .requiredString("id").endRecord(); + Schema valueSchema = SchemaBuilder.record("User").namespace("com.linkedin.foo").fields() + .requiredString("name").endRecord(); + + when(mockSchemaFetcher.getKeySchema()).thenReturn(keySchema); + when(mockSchemaFetcher.getLatestValueSchema()).thenReturn(valueSchema); + + VeniceStore store = new VeniceStore(mockSchemaFetcher, + new VeniceStoreConfig(Collections.emptyMap())); + + Schema merged = com.linkedin.hoptimator.avro.AvroSchemas.mergedAvroSchemaFor(store); + + assertEquals("com.linkedin.foo", merged.getNamespace()); + assertEquals("User", merged.getName()); + assertEquals(2, merged.getFields().size()); + assertEquals("KEY_id", merged.getFields().get(0).name()); + assertEquals("name", merged.getFields().get(1).name()); + } } From 7bb358354dbb3e6a489181957f0e67f78373446d Mon Sep 17 00:00:00 2001 From: Joseph Grogan Date: Fri, 1 May 2026 10:22:20 -0400 Subject: [PATCH 2/2] Rename AvroSchemaProvider --- ...emaProvider.java => AvroSchemaSource.java} | 4 +-- .../linkedin/hoptimator/avro/AvroSchemas.java | 12 +++---- .../hoptimator/jdbc/HoptimatorConnection.java | 23 +++++++------ .../jdbc/HoptimatorConnectionTest.java | 32 +++++++++---------- .../linkedin/hoptimator/k8s/K8sConnector.java | 8 ++--- .../hoptimator/k8s/K8sConnectorTest.java | 18 +++++------ .../util/planner/HoptimatorJdbcTable.java | 12 +++---- .../util/planner/HoptimatorJdbcTableTest.java | 14 ++++---- .../hoptimator/venice/VeniceStore.java | 4 +-- 9 files changed, 63 insertions(+), 64 deletions(-) rename hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/{AvroSchemaProvider.java => AvroSchemaSource.java} (93%) diff --git a/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroSchemaProvider.java b/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroSchemaSource.java similarity index 93% rename from hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroSchemaProvider.java rename to hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroSchemaSource.java index e04f68f5..6a537930 100644 --- a/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroSchemaProvider.java +++ b/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroSchemaSource.java @@ -18,7 +18,7 @@ * a primitive {@link Schema}. * */ -public interface AvroSchemaProvider { +public interface AvroSchemaSource { /** * Returns the value/payload Avro schema — the data record's schema without any query-layer @@ -31,7 +31,7 @@ public interface AvroSchemaProvider { * Returns the key Avro schema, or {@code null} when this table has no distinct key concept. * Struct keys expose their fields directly; primitive keys return a primitive {@link Schema}. * SQL/query-layer consumers may merge this with {@link #valueSchema()} via - * {@link AvroSchemas#mergedAvroSchemaFor(AvroSchemaProvider)}. + * {@link AvroSchemas#mergedAvroSchemaFor(AvroSchemaSource)}. */ default Schema keySchema() { return null; diff --git a/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroSchemas.java b/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroSchemas.java index aa2df42d..66f96dad 100644 --- a/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroSchemas.java +++ b/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroSchemas.java @@ -49,18 +49,18 @@ public static Schema.Field cloneField(String name, Schema.Field original) { } /** - * Produces the merged Avro schema for an {@link AvroSchemaProvider}: the value schema with key + * Produces the merged Avro schema for an {@link AvroSchemaSource}: the value schema with key * fields prepended using Hoptimator's standard convention ({@link #KEY_PREFIX} for struct keys, - * {@link #PRIMITIVE_KEY_NAME} field for primitive keys). When the provider has no key - * ({@link AvroSchemaProvider#keySchema()} returns {@code null}), returns the value schema + * {@link #PRIMITIVE_KEY_NAME} field for primitive keys). When the source has no key + * ({@link AvroSchemaSource#keySchema()} returns {@code null}), returns the value schema * unchanged. * *

Centralizes the Hoptimator merging convention so SQL/query-layer consumers (like * {@code HoptimatorConnection.resolve()}) share one implementation. */ - public static Schema mergedAvroSchemaFor(AvroSchemaProvider provider) { - Schema key = provider.keySchema(); - Schema value = provider.valueSchema(); + public static Schema mergedAvroSchemaFor(AvroSchemaSource source) { + Schema key = source.keySchema(); + Schema value = source.valueSchema(); if (key == null) { return value; } diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorConnection.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorConnection.java index 597ea79d..143c48f6 100644 --- a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorConnection.java +++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorConnection.java @@ -4,7 +4,7 @@ import com.linkedin.hoptimator.Sink; import com.linkedin.hoptimator.Source; import com.linkedin.hoptimator.avro.AvroConverter; -import com.linkedin.hoptimator.avro.AvroSchemaProvider; +import com.linkedin.hoptimator.avro.AvroSchemaSource; import com.linkedin.hoptimator.avro.AvroSchemas; import com.linkedin.hoptimator.util.ConnectionService; import com.linkedin.hoptimator.util.DelegatingConnection; @@ -51,7 +51,7 @@ public HoptimatorConnection(CalciteConnection connection, Properties connectionP public ResolvedTable resolve(List tablePath, Map hints) throws SQLException { try { - Schema avroSchema = providerSchema(tablePath); + Schema avroSchema = avroSchema(tablePath); if (avroSchema == null) { String tableSql = "SELECT * FROM " + tablePath.stream() .map(x -> "\"" + x + "\"") @@ -76,21 +76,20 @@ public ResolvedTable resolve(List tablePath, Map hints) } /** - * Returns the upstream provider's merged Avro schema (value + prefixed key fields where - * applicable) when the resolved table at {@code tablePath} implements - * {@link AvroSchemaProvider}, or {@code null} when no provider is available and the caller - * should fall back to RelDataType synthesis. + * Returns the native merged Avro schema (value + prefixed key fields where applicable) when the + * resolved table at {@code tablePath} implements {@link AvroSchemaSource}, or {@code null} when + * no source is available and the caller should fall back to RelDataType synthesis. * *

{@code resolve()} merges key+value because its consumers (the SQL/query layer, the Flink * catalog, the {@code !resolve} CLI command) expect keys to appear as columns in the table's * logical schema. Connector deployers, which render connector payload options, call - * {@link AvroSchemaProvider#valueSchema()} directly instead. + * {@link AvroSchemaSource#valueSchema()} directly instead. */ - private Schema providerSchema(List tablePath) { - return providerSchemaAt(connection.getRootSchema(), tablePath); + private Schema avroSchema(List tablePath) { + return avroSchemaAt(connection.getRootSchema(), tablePath); } - static Schema providerSchemaAt(SchemaPlus root, List tablePath) { + static Schema avroSchemaAt(SchemaPlus root, List tablePath) { SchemaPlus schema = root; for (String part : Util.skipLast(tablePath)) { if (schema == null) { @@ -102,8 +101,8 @@ static Schema providerSchemaAt(SchemaPlus root, List tablePath) { return null; } Table table = schema.tables().get(tablePath.get(tablePath.size() - 1)); - return table instanceof AvroSchemaProvider - ? AvroSchemas.mergedAvroSchemaFor((AvroSchemaProvider) table) : null; + return table instanceof AvroSchemaSource + ? AvroSchemas.mergedAvroSchemaFor((AvroSchemaSource) table) : null; } @Override diff --git a/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/HoptimatorConnectionTest.java b/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/HoptimatorConnectionTest.java index b0a041e3..961b4f73 100644 --- a/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/HoptimatorConnectionTest.java +++ b/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/HoptimatorConnectionTest.java @@ -1,7 +1,7 @@ package com.linkedin.hoptimator.jdbc; import com.linkedin.hoptimator.Source; -import com.linkedin.hoptimator.avro.AvroSchemaProvider; +import com.linkedin.hoptimator.avro.AvroSchemaSource; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; @@ -218,14 +218,14 @@ void resolveReturnsNonNullTypeForExistingTwoPartPath() throws SQLException { } @Test - void providerSchemaAtReturnsNullForUnknownPath() { + void avroSchemaAtReturnsNullForUnknownPath() { SchemaPlus root = CalciteSchema.createRootSchema(false).plus(); - assertNull(HoptimatorConnection.providerSchemaAt(root, + assertNull(HoptimatorConnection.avroSchemaAt(root, List.of("missingDb", "missingTable"))); } @Test - void providerSchemaAtReturnsNullWhenTableIsNotProvider() { + void avroSchemaAtReturnsNullWhenTableIsNotSource() { SchemaPlus root = CalciteSchema.createRootSchema(false).plus(); SchemaPlus db = root.add("db", new AbstractSchema()); db.add("plain", new AbstractTable() { @@ -235,25 +235,25 @@ public RelDataType getRowType(RelDataTypeFactory factory) { } }); - assertNull(HoptimatorConnection.providerSchemaAt(root, List.of("db", "plain"))); + assertNull(HoptimatorConnection.avroSchemaAt(root, List.of("db", "plain"))); } @Test - void providerSchemaAtReturnsValueSchemaWhenProviderHasNoKey() { + void avroSchemaAtReturnsValueSchemaWhenSourceHasNoKey() { // No key → the merge helper returns the value schema unchanged. Schema value = SchemaBuilder.record("User").namespace("com.linkedin.foo").fields() .requiredString("name").endRecord(); SchemaPlus root = CalciteSchema.createRootSchema(false).plus(); SchemaPlus db = root.add("db", new AbstractSchema()); - db.add("user", new ProviderTable(value, null)); + db.add("user", new SourceTable(value, null)); - Schema result = HoptimatorConnection.providerSchemaAt(root, List.of("db", "user")); + Schema result = HoptimatorConnection.avroSchemaAt(root, List.of("db", "user")); assertSame(value, result, "no key → merged is just the value schema"); } @Test - void providerSchemaAtMergesKeyAndValueWhenProviderExposesBoth() { + void avroSchemaAtMergesKeyAndValueWhenSourceExposesBoth() { // Both key and value → merged view with KEY_-prefixed key fields prepended before value. // resolve() uses this so SQL queries can reference key columns by name. Schema value = SchemaBuilder.record("User").namespace("com.linkedin.foo").fields() @@ -262,9 +262,9 @@ void providerSchemaAtMergesKeyAndValueWhenProviderExposesBoth() { .requiredString("id").endRecord(); SchemaPlus root = CalciteSchema.createRootSchema(false).plus(); SchemaPlus db = root.add("db", new AbstractSchema()); - db.add("user", new ProviderTable(value, key)); + db.add("user", new SourceTable(value, key)); - Schema result = HoptimatorConnection.providerSchemaAt(root, List.of("db", "user")); + Schema result = HoptimatorConnection.avroSchemaAt(root, List.of("db", "user")); assertNotNull(result); assertEquals(2, result.getFields().size()); @@ -276,15 +276,15 @@ void providerSchemaAtMergesKeyAndValueWhenProviderExposesBoth() { } @Test - void providerSchemaAtMergesPrimitiveKeyAsSingleKeyField() { + void avroSchemaAtMergesPrimitiveKeyAsSingleKeyField() { Schema value = SchemaBuilder.record("User").namespace("com.linkedin.foo").fields() .requiredString("name").endRecord(); Schema key = Schema.create(Schema.Type.STRING); SchemaPlus root = CalciteSchema.createRootSchema(false).plus(); SchemaPlus db = root.add("db", new AbstractSchema()); - db.add("user", new ProviderTable(value, key)); + db.add("user", new SourceTable(value, key)); - Schema result = HoptimatorConnection.providerSchemaAt(root, List.of("db", "user")); + Schema result = HoptimatorConnection.avroSchemaAt(root, List.of("db", "user")); assertNotNull(result); assertEquals(2, result.getFields().size()); @@ -293,11 +293,11 @@ void providerSchemaAtMergesPrimitiveKeyAsSingleKeyField() { assertEquals("name", result.getFields().get(1).name()); } - private static final class ProviderTable extends AbstractTable implements AvroSchemaProvider { + private static final class SourceTable extends AbstractTable implements AvroSchemaSource { private final Schema value; private final Schema key; - ProviderTable(Schema value, Schema key) { + SourceTable(Schema value, Schema key) { this.value = value; this.key = key; } diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java index 6ca2e304..fe5145a5 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java @@ -5,7 +5,7 @@ import com.linkedin.hoptimator.Sink; import com.linkedin.hoptimator.Source; import com.linkedin.hoptimator.avro.AvroConverter; -import com.linkedin.hoptimator.avro.AvroSchemaProvider; +import com.linkedin.hoptimator.avro.AvroSchemaSource; import com.linkedin.hoptimator.avro.AvroSchemas; import com.linkedin.hoptimator.jdbc.HoptimatorDriver; import com.linkedin.hoptimator.k8s.models.V1alpha1TableTemplate; @@ -116,15 +116,15 @@ private Map getConnectorHints(Map options, Strin /** * Renders the value Avro schema for the {@code {{avroValueSchema}}} template variable. Prefers - * the upstream table's native value schema when it implements {@link AvroSchemaProvider} — keys + * the upstream table's native value schema when it implements {@link AvroSchemaSource} — keys * aren't included, because the connector handles them separately via {@code key.fields}. Falls * back to synthesizing from the flat row type, which loses source-level namespaces and nested * record identities. */ private Schema avroValueSchema(Source source, RelDataType sourceRowType) { Table table = lookupTable(source); - if (table instanceof AvroSchemaProvider) { - Schema provided = ((AvroSchemaProvider) table).valueSchema(); + if (table instanceof AvroSchemaSource) { + Schema provided = ((AvroSchemaSource) table).valueSchema(); if (provided != null) { return provided; } diff --git a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sConnectorTest.java b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sConnectorTest.java index fd8bc177..422bf7b7 100644 --- a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sConnectorTest.java +++ b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sConnectorTest.java @@ -2,7 +2,7 @@ import com.linkedin.hoptimator.Sink; import com.linkedin.hoptimator.Source; -import com.linkedin.hoptimator.avro.AvroSchemaProvider; +import com.linkedin.hoptimator.avro.AvroSchemaSource; import com.linkedin.hoptimator.jdbc.HoptimatorConnection; import com.linkedin.hoptimator.jdbc.HoptimatorDriver; import com.linkedin.hoptimator.k8s.models.V1alpha1TableTemplate; @@ -357,8 +357,8 @@ K8sApi createTableTemplateApi( } @Test - void configureAvroValueSchemaUsesProviderWhenAvailable() throws SQLException { - // When the resolved table implements AvroSchemaProvider, its native Avro schema is rendered + void configureAvroValueSchemaUsesSourceWhenAvailable() throws SQLException { + // When the resolved table implements AvroSchemaSource, its native Avro schema is rendered // into the template as-is — no round-trip through RelDataType. RelDataType rowType = new RelDataTypeFactory.Builder(typeFactory) .add("KEY_id", SqlTypeName.VARCHAR) @@ -366,13 +366,13 @@ void configureAvroValueSchemaUsesProviderWhenAvailable() throws SQLException { hoptimatorDriverMock.when(() -> HoptimatorDriver.rowType(any(Source.class), any())) .thenReturn(rowType); - Schema nativeSchema = SchemaBuilder.record("User").namespace("com.linkedin.foo").fields() + Schema avroSchema = SchemaBuilder.record("User").namespace("com.linkedin.foo").fields() .requiredString("KEY_id") .requiredString("name") .endRecord(); Source source = new Source("testdb", Arrays.asList("schema", "table"), Collections.emptyMap()); - installRootSchemaWithTable(source, new ProviderTable(nativeSchema)); + installRootSchemaWithTable(source, new SourceTable(avroSchema)); FakeK8sApi templateApi = new FakeK8sApi<>( List.of(new V1alpha1TableTemplate() @@ -393,8 +393,8 @@ void configureAvroValueSchemaUsesProviderWhenAvailable() throws SQLException { } @Test - void configureAvroValueSchemaFallsBackToRowTypeSynthesisWhenNoProvider() throws SQLException { - // No provider on the resolved table → synthesize from the row type using AvroConverter.avro. + void configureAvroValueSchemaFallsBackToRowTypeSynthesisWhenNoSource() throws SQLException { + // No AvroSchemaSource on the resolved table → synthesize from the row type using AvroConverter.avro. RelDataType rowType = new RelDataTypeFactory.Builder(typeFactory) .add("name", SqlTypeName.VARCHAR).build(); hoptimatorDriverMock.when(() -> HoptimatorDriver.rowType(any(Source.class), any())) @@ -439,10 +439,10 @@ public RelDataType getRowType(RelDataTypeFactory factory) { } } - private static final class ProviderTable extends AbstractTable implements AvroSchemaProvider { + private static final class SourceTable extends AbstractTable implements AvroSchemaSource { private final Schema value; - ProviderTable(Schema value) { + SourceTable(Schema value) { this.value = value; } diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcTable.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcTable.java index 4ff5b502..3c853c63 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcTable.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcTable.java @@ -1,7 +1,7 @@ package com.linkedin.hoptimator.util.planner; import com.google.common.annotations.VisibleForTesting; -import com.linkedin.hoptimator.avro.AvroSchemaProvider; +import com.linkedin.hoptimator.avro.AvroSchemaSource; import com.linkedin.hoptimator.util.DataTypeUtils; import org.apache.avro.Schema; import org.apache.calcite.adapter.java.AbstractQueryableTable; @@ -30,7 +30,7 @@ public class HoptimatorJdbcTable extends AbstractQueryableTable implements TranslatableTable, - ModifiableTable, AvroSchemaProvider { + ModifiableTable, AvroSchemaSource { private final JdbcTable jdbcTable; private final HoptimatorJdbcConvention convention; @@ -79,7 +79,7 @@ public Queryable asQueryable(QueryProvider queryProvider, /** * Returns the upstream's native Avro value schema when the underlying data source exposes one - * via {@link AvroSchemaProvider}. Returns {@code null} when the upstream is not a Calcite + * via {@link AvroSchemaSource}. Returns {@code null} when the upstream is not a Calcite * connection, when the upstream table cannot be located, or when it does not implement the * interface. * @@ -89,17 +89,17 @@ public Queryable asQueryable(QueryProvider queryProvider, @Override public @Nullable Schema valueSchema() { Table upstream = upstreamTable(); - return upstream instanceof AvroSchemaProvider ? ((AvroSchemaProvider) upstream).valueSchema() : null; + return upstream instanceof AvroSchemaSource ? ((AvroSchemaSource) upstream).valueSchema() : null; } /** * Returns the upstream's native Avro key schema when present, or {@code null} when the upstream - * table has no distinct key concept or does not implement {@link AvroSchemaProvider}. + * table has no distinct key concept or does not implement {@link AvroSchemaSource}. */ @Override public @Nullable Schema keySchema() { Table upstream = upstreamTable(); - return upstream instanceof AvroSchemaProvider ? ((AvroSchemaProvider) upstream).keySchema() : null; + return upstream instanceof AvroSchemaSource ? ((AvroSchemaSource) upstream).keySchema() : null; } @VisibleForTesting diff --git a/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcTableTest.java b/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcTableTest.java index 9ee99602..87065624 100644 --- a/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcTableTest.java +++ b/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcTableTest.java @@ -1,6 +1,6 @@ package com.linkedin.hoptimator.util.planner; -import com.linkedin.hoptimator.avro.AvroSchemaProvider; +import com.linkedin.hoptimator.avro.AvroSchemaSource; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.apache.calcite.adapter.jdbc.JdbcTable; @@ -117,7 +117,7 @@ Table upstreamTable() { } @Test - void valueSchemaReturnsNullWhenUpstreamDoesNotImplementProvider() { + void valueSchemaReturnsNullWhenUpstreamDoesNotImplementSource() { Table upstream = new AbstractTable() { @Override public RelDataType getRowType(RelDataTypeFactory factory) { @@ -137,12 +137,12 @@ Table upstreamTable() { } @Test - void valueAndKeySchemaDelegateToUpstreamProvider() { + void valueAndKeySchemaDelegateToUpstreamSource() { Schema expectedValue = SchemaBuilder.record("Foo").namespace("com.linkedin.bar").fields() .requiredString("v").endRecord(); Schema expectedKey = SchemaBuilder.record("FooKey").namespace("com.linkedin.keyns").fields() .requiredString("id").endRecord(); - Table upstream = new ProviderTable(expectedValue, expectedKey); + Table upstream = new SourceTable(expectedValue, expectedKey); HoptimatorJdbcTable wrapper = new HoptimatorJdbcTable(mockJdbcTable, new HoptimatorJdbcConvention(AnsiSqlDialect.DEFAULT, mockExpression, "db", Collections.emptyList(), mockConnection)) { @@ -159,7 +159,7 @@ Table upstreamTable() { void keySchemaReturnsNullWhenUpstreamHasNoKey() { Schema valueOnly = SchemaBuilder.record("Foo").namespace("ns").fields() .requiredString("v").endRecord(); - Table upstream = new ProviderTable(valueOnly, null); + Table upstream = new SourceTable(valueOnly, null); HoptimatorJdbcTable wrapper = new HoptimatorJdbcTable(mockJdbcTable, new HoptimatorJdbcConvention(AnsiSqlDialect.DEFAULT, mockExpression, "db", Collections.emptyList(), mockConnection)) { @@ -172,11 +172,11 @@ Table upstreamTable() { assertNull(wrapper.keySchema(), "upstream with no key should propagate null"); } - private static final class ProviderTable extends AbstractTable implements AvroSchemaProvider { + private static final class SourceTable extends AbstractTable implements AvroSchemaSource { private final Schema value; private final Schema key; - ProviderTable(Schema value, Schema key) { + SourceTable(Schema value, Schema key) { this.value = value; this.key = key; } diff --git a/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceStore.java b/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceStore.java index 6bee4613..245432e1 100644 --- a/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceStore.java +++ b/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceStore.java @@ -1,7 +1,7 @@ package com.linkedin.hoptimator.venice; import com.linkedin.hoptimator.avro.AvroConverter; -import com.linkedin.hoptimator.avro.AvroSchemaProvider; +import com.linkedin.hoptimator.avro.AvroSchemaSource; import com.linkedin.hoptimator.avro.AvroSchemas; import com.linkedin.hoptimator.util.DataTypeUtils; import com.linkedin.venice.client.schema.StoreSchemaFetcher; @@ -13,7 +13,7 @@ /** A batch of records from a Venice store. */ -public class VeniceStore extends AbstractTable implements AvroSchemaProvider { +public class VeniceStore extends AbstractTable implements AvroSchemaSource { private final StoreSchemaFetcher storeSchemaFetcher; private final Integer valueSchemaId;