From 2281635249d8d15d5a09e500b83a8f8918cba7d6 Mon Sep 17 00:00:00 2001 From: Nicolas Paris Date: Thu, 21 Nov 2024 14:56:30 +0100 Subject: [PATCH 1/2] feat: support registry for primitive types (#1) --- .../ConfluentRegistryAvroSerializationSchema.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroSerializationSchema.java b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroSerializationSchema.java index b3b574c27be57..fc2a9cefaf1f0 100644 --- a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroSerializationSchema.java +++ b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroSerializationSchema.java @@ -140,4 +140,19 @@ public static ConfluentRegistryAvroSerializationSchema forGeneric DEFAULT_IDENTITY_MAP_CAPACITY, registryConfigs)); } + + public static ConfluentRegistryAvroSerializationSchema forPrimitiveType( + String subject, + Schema schema, + String schemaRegistryUrl, + @Nullable Map registryConfigs) { + return new ConfluentRegistryAvroSerializationSchema<>( + Object.class, + schema, + new CachedSchemaCoderProvider( + subject, + schemaRegistryUrl, + DEFAULT_IDENTITY_MAP_CAPACITY, + registryConfigs)); + } } From 5ec1d9f46ff0e4201058500673988b51951f8418 Mon Sep 17 00:00:00 2001 From: nicolas-paris Date: Thu, 18 Sep 2025 10:02:50 +0200 Subject: [PATCH 2/2] add instanciation test --- ...ntRegistryAvroSerializationSchemaTest.java | 168 ++++++++++++++++++ 1 file changed, 168 insertions(+) create mode 100644 flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroSerializationSchemaTest.java diff --git a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroSerializationSchemaTest.java b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroSerializationSchemaTest.java new file mode 100644 index 0000000000000..229c574e671f9 --- /dev/null +++ b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroSerializationSchemaTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.confluent; + +import org.apache.flink.formats.avro.generated.Address; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link ConfluentRegistryAvroSerializationSchema}. */ +class ConfluentRegistryAvroSerializationSchemaTest { + + private static final String SUBJECT = "test-subject"; + private static final String SCHEMA_REGISTRY_URL = "someUrl"; + + @Test + void testForPrimitiveTypeWithStringSchema() { + Schema stringSchema = Schema.create(Schema.Type.STRING); + + ConfluentRegistryAvroSerializationSchema schema = + ConfluentRegistryAvroSerializationSchema.forPrimitiveType( + SUBJECT, stringSchema, SCHEMA_REGISTRY_URL, null); + + assertThat(schema).isNotNull(); + } + + @Test + void testForPrimitiveTypeWithStringSchemaAndConfigs() { + Schema stringSchema = Schema.create(Schema.Type.STRING); + Map configs = new HashMap<>(); + configs.put("basic.auth.credentials.source", "USER_INFO"); + configs.put("basic.auth.user.info", "user:password"); + + ConfluentRegistryAvroSerializationSchema schema = + ConfluentRegistryAvroSerializationSchema.forPrimitiveType( + SUBJECT, stringSchema, SCHEMA_REGISTRY_URL, configs); + + assertThat(schema).isNotNull(); + } + + @Test + void testForPrimitiveTypeWithIntSchema() { + Schema intSchema = Schema.create(Schema.Type.INT); + + ConfluentRegistryAvroSerializationSchema schema = + ConfluentRegistryAvroSerializationSchema.forPrimitiveType( + SUBJECT, intSchema, SCHEMA_REGISTRY_URL, null); + + assertThat(schema).isNotNull(); + } + + @Test + void testForPrimitiveTypeWithLongSchema() { + Schema longSchema = Schema.create(Schema.Type.LONG); + + ConfluentRegistryAvroSerializationSchema schema = + ConfluentRegistryAvroSerializationSchema.forPrimitiveType( + SUBJECT, longSchema, SCHEMA_REGISTRY_URL, null); + + assertThat(schema).isNotNull(); + } + + @Test + void testForPrimitiveTypeWithBooleanSchema() { + Schema booleanSchema = Schema.create(Schema.Type.BOOLEAN); + + ConfluentRegistryAvroSerializationSchema schema = + ConfluentRegistryAvroSerializationSchema.forPrimitiveType( + SUBJECT, booleanSchema, SCHEMA_REGISTRY_URL, null); + + assertThat(schema).isNotNull(); + } + + @Test + void testForPrimitiveTypeWithFloatSchema() { + Schema floatSchema = Schema.create(Schema.Type.FLOAT); + + ConfluentRegistryAvroSerializationSchema schema = + ConfluentRegistryAvroSerializationSchema.forPrimitiveType( + SUBJECT, floatSchema, SCHEMA_REGISTRY_URL, null); + + assertThat(schema).isNotNull(); + } + + @Test + void testForPrimitiveTypeWithDoubleSchema() { + Schema doubleSchema = Schema.create(Schema.Type.DOUBLE); + + ConfluentRegistryAvroSerializationSchema schema = + ConfluentRegistryAvroSerializationSchema.forPrimitiveType( + SUBJECT, doubleSchema, SCHEMA_REGISTRY_URL, null); + + assertThat(schema).isNotNull(); + } + + @Test + void testForPrimitiveTypeInstantiation() throws Exception { + Schema stringSchema = Schema.create(Schema.Type.STRING); + + ConfluentRegistryAvroSerializationSchema schema = + ConfluentRegistryAvroSerializationSchema.forPrimitiveType( + SUBJECT, stringSchema, SCHEMA_REGISTRY_URL, null); + + assertThat(schema).isNotNull(); + // Test that the schema was created with correct parameters + // We can't test actual serialization without a running schema registry, + // but we can verify the factory method works correctly + } + + @Test + void testForPrimitiveTypeConsistencyWithFactoryMethods() { + Schema stringSchema = Schema.create(Schema.Type.STRING); + GenericRecord genericRecord = new GenericData.Record(createRecordSchema()); + + ConfluentRegistryAvroSerializationSchema primitiveSchema = + ConfluentRegistryAvroSerializationSchema.forPrimitiveType( + SUBJECT, stringSchema, SCHEMA_REGISTRY_URL, null); + + ConfluentRegistryAvroSerializationSchema genericSchema = + ConfluentRegistryAvroSerializationSchema.forGeneric( + SUBJECT, createRecordSchema(), SCHEMA_REGISTRY_URL, null); + + ConfluentRegistryAvroSerializationSchema
specificSchema = + ConfluentRegistryAvroSerializationSchema.forSpecific( + Address.class, SUBJECT, SCHEMA_REGISTRY_URL, null); + + assertThat(primitiveSchema).isNotNull(); + assertThat(genericSchema).isNotNull(); + assertThat(specificSchema).isNotNull(); + + assertThat(primitiveSchema.getClass()).isEqualTo(genericSchema.getClass()); + assertThat(primitiveSchema.getClass()).isEqualTo(specificSchema.getClass()); + } + + private Schema createRecordSchema() { + return Schema.createRecord( + "TestRecord", + null, + "org.apache.flink.test", + false, + java.util.Arrays.asList( + new Schema.Field( + "testField", Schema.create(Schema.Type.STRING), null, null))); + } +}