From c5614e975793755c32baec9a44e8aa962307db3d Mon Sep 17 00:00:00 2001 From: ethan-xiao <308056554@qq.com> Date: Thu, 27 Mar 2025 09:37:56 +0800 Subject: [PATCH] [hotfix] Supports specifying routing fields during write operations. Parameter name: sink.partition-routing.fields --- .../table/ElasticsearchConfiguration.java | 4 + .../table/ElasticsearchConnectorOptions.java | 6 ++ .../elasticsearch/table/KeyExtractor.java | 37 +++++++++ .../elasticsearch/table/RequestFactory.java | 8 +- .../table/RowElasticsearchSinkFunction.java | 17 ++-- .../elasticsearch/table/KeyExtractorTest.java | 80 +++++++++++++++++++ .../table/Elasticsearch6DynamicSink.java | 28 +++++-- .../table/Elasticsearch7DynamicSink.java | 28 +++++-- .../Elasticsearch7DynamicTableFactory.java | 4 +- 9 files changed, 186 insertions(+), 26 deletions(-) diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java index 04c76333..d2c47431 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java @@ -150,6 +150,10 @@ public Optional getPathPrefix() { return config.getOptional(ElasticsearchConnectorOptions.CONNECTION_PATH_PREFIX); } + public String getPartitionRoutingFields() { + return config.get(ElasticsearchConnectorOptions.PARTITION_ROUTING_FIELDS); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConnectorOptions.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConnectorOptions.java index 4838b035..dfa8c19d 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConnectorOptions.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConnectorOptions.java @@ -152,6 +152,12 @@ public class ElasticsearchConnectorOptions { "The format must produce a valid JSON document. " + "Please refer to the documentation on formats for more details."); + public static final ConfigOption PARTITION_ROUTING_FIELDS = + ConfigOptions.key("sink.partition-routing.fields") + .stringType() + .noDefaultValue() + .withDescription("Route field names list, multiple separated by commas."); + // -------------------------------------------------------------------------------------------- // Enums // -------------------------------------------------------------------------------------------- diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractor.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractor.java index ae7c522b..f50d28f4 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractor.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractor.java @@ -24,15 +24,19 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.DistinctType; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.util.StringUtils; import java.io.Serializable; import java.time.Duration; import java.time.LocalDate; import java.time.LocalTime; import java.time.Period; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.function.Function; /** An extractor for a Elasticsearch key from a {@link RowData}. */ @@ -109,6 +113,39 @@ public static Function createKeyExtractor( .orElseGet(() -> (Function & Serializable) (row) -> null); } + public static Function createColumnExtractor( + TableSchema schema, String keyDelimiter, String columns) { + List cols = null; + if (StringUtils.isNullOrWhitespaceOnly(columns)) { + cols = new ArrayList<>(0); + } else { + cols = Arrays.asList(columns.split(",")); + } + return createColumnExtractor(schema, keyDelimiter, cols); + } + + public static Function createColumnExtractor( + TableSchema schema, String keyDelimiter, List columns) { + Map namesToColumns = new HashMap<>(); + List tableColumns = schema.getTableColumns(); + for (int i = 0; i < schema.getFieldCount(); i++) { + TableColumn column = tableColumns.get(i); + namesToColumns.put(column.getName(), new ColumnWithIndex(column, i)); + } + + FieldFormatter[] fieldFormatters = columns == null || columns.isEmpty() ? new FieldFormatter[0] : + columns.stream() + .map(namesToColumns::get) + .map( + column -> + toFormatter( + column.index, column.getType())) + .toArray(FieldFormatter[]::new); + + Function extractor = new KeyExtractor(fieldFormatters, keyDelimiter); + return Optional.of(extractor).orElseGet(() -> (Function & Serializable) (row) -> null); + } + private static FieldFormatter toFormatter(int index, LogicalType type) { switch (type.getTypeRoot()) { case DATE: diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RequestFactory.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RequestFactory.java index f5b24180..2eea7c56 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RequestFactory.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RequestFactory.java @@ -36,19 +36,17 @@ interface RequestFactory extends Serializable { * Creates an update request to be added to a {@link RequestIndexer}. Note: the type field has * been deprecated since Elasticsearch 7.x and it would not take any effort. */ - UpdateRequest createUpdateRequest( - String index, String docType, String key, XContentType contentType, byte[] document); + UpdateRequest createUpdateRequest(String index, String docType, String key, String routing, XContentType contentType, byte[] document); /** * Creates an index request to be added to a {@link RequestIndexer}. Note: the type field has * been deprecated since Elasticsearch 7.x and it would not take any effort. */ - IndexRequest createIndexRequest( - String index, String docType, String key, XContentType contentType, byte[] document); + IndexRequest createIndexRequest(String index, String docType, String key, String routing, XContentType contentType, byte[] document); /** * Creates a delete request to be added to a {@link RequestIndexer}. Note: the type field has * been deprecated since Elasticsearch 7.x and it would not take any effort. */ - DeleteRequest createDeleteRequest(String index, String docType, String key); + DeleteRequest createDeleteRequest(String index, String docType, String key, String routing); } diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java index 48762522..4ea8e1da 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java @@ -51,6 +51,7 @@ class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction private final XContentType contentType; private final RequestFactory requestFactory; private final Function createKey; + private final Function routingKey; public RowElasticsearchSinkFunction( IndexGenerator indexGenerator, @@ -58,13 +59,15 @@ public RowElasticsearchSinkFunction( SerializationSchema serializationSchema, XContentType contentType, RequestFactory requestFactory, - Function createKey) { + Function createKey, + Function routingKey) { this.indexGenerator = Preconditions.checkNotNull(indexGenerator); this.docType = docType; this.serializationSchema = Preconditions.checkNotNull(serializationSchema); this.contentType = Preconditions.checkNotNull(contentType); this.requestFactory = Preconditions.checkNotNull(requestFactory); this.createKey = Preconditions.checkNotNull(createKey); + this.routingKey = routingKey; } @Override @@ -96,12 +99,12 @@ private void processUpsert(RowData row, RequestIndexer indexer) { if (key != null) { final UpdateRequest updateRequest = requestFactory.createUpdateRequest( - indexGenerator.generate(row), docType, key, contentType, document); + indexGenerator.generate(row), docType, key, routingKey.apply(row), contentType, document); indexer.add(updateRequest); } else { final IndexRequest indexRequest = requestFactory.createIndexRequest( - indexGenerator.generate(row), docType, key, contentType, document); + indexGenerator.generate(row), docType, key, routingKey.apply(row), contentType, document); indexer.add(indexRequest); } } @@ -109,7 +112,7 @@ private void processUpsert(RowData row, RequestIndexer indexer) { private void processDelete(RowData row, RequestIndexer indexer) { final String key = createKey.apply(row); final DeleteRequest deleteRequest = - requestFactory.createDeleteRequest(indexGenerator.generate(row), docType, key); + requestFactory.createDeleteRequest(indexGenerator.generate(row), docType, key, routingKey.apply(row)); indexer.add(deleteRequest); } @@ -127,7 +130,8 @@ public boolean equals(Object o) { && Objects.equals(serializationSchema, that.serializationSchema) && contentType == that.contentType && Objects.equals(requestFactory, that.requestFactory) - && Objects.equals(createKey, that.createKey); + && Objects.equals(createKey, that.createKey) + && Objects.equals(routingKey, that.routingKey); } @Override @@ -138,6 +142,7 @@ public int hashCode() { serializationSchema, contentType, requestFactory, - createKey); + createKey, + routingKey); } } diff --git a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractorTest.java b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractorTest.java index b7479f98..786c60b3 100644 --- a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractorTest.java +++ b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractorTest.java @@ -31,6 +31,8 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.util.Arrays; +import java.util.List; import java.util.function.Function; import static org.assertj.core.api.Assertions.assertThat; @@ -129,4 +131,82 @@ public void testAllTypesKey() { .isEqualTo( "1_2_3_4_true_1.0_2.0_ABCD_2012-12-12T12:12:12_2013-01-13T13:13:13_14:14:14_2015-05-15"); } + + @Test + public void testStringColumnsExtractor() { + TableSchema schema = + TableSchema.builder() + .field("a", DataTypes.BIGINT().notNull()) + .field("b", DataTypes.STRING()) + .primaryKey("a") + .build(); + + Function keyExtractor = KeyExtractor.createColumnExtractor(schema, "_", "a,b"); + + String key = keyExtractor.apply(GenericRowData.of(12L, StringData.fromString("ABCD"))); + assertThat(key).isEqualTo("12_ABCD"); + } + + @Test + public void testListColumnsExtractor() { + TableSchema schema = + TableSchema.builder() + .field("a", DataTypes.BIGINT().notNull()) + .field("b", DataTypes.STRING()) + .primaryKey("a") + .build(); + + Function keyExtractor = KeyExtractor.createColumnExtractor(schema, "_", + Arrays.asList("a", "b")); + + String key = keyExtractor.apply(GenericRowData.of(12L, StringData.fromString("ABCD"))); + assertThat(key).isEqualTo("12_ABCD"); + } + + @Test + public void testEmptyColumnsExtractor() { + TableSchema schema = + TableSchema.builder() + .field("a", DataTypes.BIGINT().notNull()) + .field("b", DataTypes.STRING()) + .primaryKey("a") + .build(); + + String columns = null; + Function keyExtractor = KeyExtractor.createColumnExtractor(schema, "_", columns); + + String key = keyExtractor.apply(GenericRowData.of(12L, StringData.fromString("ABCD"))); + assertThat(key).isEqualTo(""); + } + + @Test + public void testBlankColumnsExtractor() { + TableSchema schema = + TableSchema.builder() + .field("a", DataTypes.BIGINT().notNull()) + .field("b", DataTypes.STRING()) + .primaryKey("a") + .build(); + + Function keyExtractor = KeyExtractor.createColumnExtractor(schema, "_", ""); + + String key = keyExtractor.apply(GenericRowData.of(12L, StringData.fromString("ABCD"))); + assertThat(key).isEqualTo(""); + } + + @Test + public void testNullColumnsExtractor() { + TableSchema schema = + TableSchema.builder() + .field("a", DataTypes.BIGINT().notNull()) + .field("b", DataTypes.STRING()) + .primaryKey("a") + .build(); + + List columns = null; + Function keyExtractor = KeyExtractor.createColumnExtractor(schema, "_", columns); + + String key = keyExtractor.apply(GenericRowData.of(12L, StringData.fromString("ABCD"))); + assertThat(key).isEqualTo(""); + } } diff --git a/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java b/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java index 1a2cdd18..b8d46ba5 100644 --- a/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java +++ b/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java @@ -148,7 +148,8 @@ public SinkFunctionProvider getSinkRuntimeProvider(Context context) { format, XContentType.JSON, REQUEST_FACTORY, - KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter())); + KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()), + KeyExtractor.createColumnExtractor(schema, config.getKeyDelimiter(), config.getPartitionRoutingFields())); final ElasticsearchSink.Builder builder = builderProvider.createBuilder(config.getHosts(), upsertFunction); @@ -295,11 +296,15 @@ public UpdateRequest createUpdateRequest( String index, String docType, String key, + String routing, XContentType contentType, byte[] document) { - return new UpdateRequest(index, docType, key) - .doc(document, contentType) - .upsert(document, contentType); + UpdateRequest req = new UpdateRequest(index, docType, key) + .doc(document, contentType); + if (!StringUtils.isNullOrWhitespaceOnly(routing)) { + req = req.routing(routing); + } + return req.upsert(document, contentType); } @Override @@ -307,14 +312,23 @@ public IndexRequest createIndexRequest( String index, String docType, String key, + String routing, XContentType contentType, byte[] document) { - return new IndexRequest(index, docType, key).source(document, contentType); + IndexRequest req = new IndexRequest(index, docType, key); + if (!StringUtils.isNullOrWhitespaceOnly(routing)) { + req = req.routing(routing); + } + return req.source(document, contentType); } @Override - public DeleteRequest createDeleteRequest(String index, String docType, String key) { - return new DeleteRequest(index, docType, key); + public DeleteRequest createDeleteRequest(String index, String docType, String key, String routing) { + DeleteRequest req = new DeleteRequest(index, docType, key); + if (!StringUtils.isNullOrWhitespaceOnly(routing)) { + req = req.routing(routing); + } + return req; } } diff --git a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java index 1926e445..317e6b15 100644 --- a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java +++ b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java @@ -143,7 +143,8 @@ public SinkFunctionProvider getSinkRuntimeProvider(Context context) { format, XContentType.JSON, REQUEST_FACTORY, - KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter())); + KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()), + KeyExtractor.createColumnExtractor(schema, config.getKeyDelimiter(), config.getPartitionRoutingFields())); final ElasticsearchSink.Builder builder = builderProvider.createBuilder(config.getHosts(), upsertFunction); @@ -290,11 +291,15 @@ public UpdateRequest createUpdateRequest( String index, String docType, String key, + String routing, XContentType contentType, byte[] document) { - return new UpdateRequest(index, key) - .doc(document, contentType) - .upsert(document, contentType); + UpdateRequest req = new UpdateRequest(index, key) + .doc(document, contentType); + if (!StringUtils.isNullOrWhitespaceOnly(routing)) { + req = req.routing(routing); + } + return req.upsert(document, contentType); } @Override @@ -302,14 +307,23 @@ public IndexRequest createIndexRequest( String index, String docType, String key, + String routing, XContentType contentType, byte[] document) { - return new IndexRequest(index).id(key).source(document, contentType); + IndexRequest req = new IndexRequest(index).id(key); + if (!StringUtils.isNullOrWhitespaceOnly(routing)) { + req = req.routing(routing); + } + return req.source(document, contentType); } @Override - public DeleteRequest createDeleteRequest(String index, String docType, String key) { - return new DeleteRequest(index, key); + public DeleteRequest createDeleteRequest(String index, String docType, String key, String routing) { + DeleteRequest req = new DeleteRequest(index, key); + if (!StringUtils.isNullOrWhitespaceOnly(routing)) { + req = req.routing(routing); + } + return req; } } diff --git a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicTableFactory.java b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicTableFactory.java index b516777d..8959a59f 100644 --- a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicTableFactory.java +++ b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicTableFactory.java @@ -66,6 +66,7 @@ import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.HOSTS_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.PARTITION_ROUTING_FIELDS; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION; import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE; @@ -104,7 +105,8 @@ public class Elasticsearch7DynamicTableFactory PARTIAL_CACHE_EXPIRE_AFTER_WRITE, PARTIAL_CACHE_MAX_ROWS, PARTIAL_CACHE_CACHE_MISSING_KEY, - MAX_RETRIES) + MAX_RETRIES, + PARTITION_ROUTING_FIELDS) .collect(Collectors.toSet()); @Override