From e28f2d96269e0bbe073d9c32a7d08dc522df614c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Mro=CC=81wka?= Date: Thu, 23 May 2024 08:14:40 +0200 Subject: [PATCH] Extract upsert from sink strategies as configurable option --- config/MongoSinkConnector.properties | 1 + .../connect/sink/MongoSinkTopicConfig.java | 20 +++++++ .../ReplaceOneBusinessKeyStrategy.java | 6 +- .../strategy/ReplaceOneDefaultStrategy.java | 14 ++++- ...UpdateOneBusinessKeyTimestampStrategy.java | 6 +- .../strategy/UpdateOneDefaultStrategy.java | 14 ++++- .../strategy/UpdateOneTimestampsStrategy.java | 14 ++++- .../strategy/WriteModelStrategyTest.java | 56 +++++++++++++++++++ 8 files changed, 118 insertions(+), 13 deletions(-) diff --git a/config/MongoSinkConnector.properties b/config/MongoSinkConnector.properties index 6c6ed575a..c7ab81147 100644 --- a/config/MongoSinkConnector.properties +++ b/config/MongoSinkConnector.properties @@ -31,6 +31,7 @@ post.processor.chain=com.mongodb.kafka.connect.sink.processor.DocumentIdAdder # Write configuration delete.on.null.values=false writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy +writemodel.strategy.upsert=true max.batch.size = 0 rate.limiting.timeout=0 diff --git a/src/main/java/com/mongodb/kafka/connect/sink/MongoSinkTopicConfig.java b/src/main/java/com/mongodb/kafka/connect/sink/MongoSinkTopicConfig.java index 9ab38d884..947f181ec 100644 --- a/src/main/java/com/mongodb/kafka/connect/sink/MongoSinkTopicConfig.java +++ b/src/main/java/com/mongodb/kafka/connect/sink/MongoSinkTopicConfig.java @@ -173,6 +173,12 @@ public String value() { static final String WRITEMODEL_STRATEGY_DEFAULT = "com.mongodb.kafka.connect.sink.writemodel.strategy.DefaultWriteModelStrategy"; + public static final String WRITEMODEL_STRATEGY_UPSERT_CONFIG = "writemodel.strategy.upsert"; + private static final String WRITEMODEL_STRATEGY_UPSERT_DISPLAY = "The upsert writeModel strategy"; + private static final String WRITEMODEL_STRATEGY_UPSERT_DOC = + "Whether or not to use upserts for the write model strategy"; + static final boolean WRITEMODEL_STRATEGY_UPSERT_DEFAULT = true; + public static final String DELETE_WRITEMODEL_STRATEGY_CONFIG = "delete.writemodel.strategy"; private static final String DELETE_WRITEMODEL_STRATEGY_DISPLAY = "The delete writeModel strategy"; private static final String DELETE_WRITEMODEL_STRATEGY_DOC = @@ -518,6 +524,10 @@ public IdStrategy getIdStrategy() { return idStrategy; } + public boolean isUpsertEnabled() { + return getBoolean(WRITEMODEL_STRATEGY_UPSERT_CONFIG); + } + PostProcessors getPostProcessors() { if (postProcessors == null) { postProcessors = new PostProcessors(this, getList(POST_PROCESSOR_CHAIN_CONFIG)); @@ -832,6 +842,16 @@ private static ConfigDef createConfigDef() { ++orderInGroup, ConfigDef.Width.MEDIUM, WRITEMODEL_STRATEGY_DISPLAY); + configDef.define( + WRITEMODEL_STRATEGY_UPSERT_CONFIG, + ConfigDef.Type.BOOLEAN, + WRITEMODEL_STRATEGY_UPSERT_DEFAULT, + ConfigDef.Importance.LOW, + WRITEMODEL_STRATEGY_UPSERT_DOC, + group, + ++orderInGroup, + ConfigDef.Width.MEDIUM, + WRITEMODEL_STRATEGY_UPSERT_DISPLAY); configDef.define( DELETE_WRITEMODEL_STRATEGY_CONFIG, ConfigDef.Type.STRING, diff --git a/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/ReplaceOneBusinessKeyStrategy.java b/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/ReplaceOneBusinessKeyStrategy.java index e4d399323..734e72701 100644 --- a/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/ReplaceOneBusinessKeyStrategy.java +++ b/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/ReplaceOneBusinessKeyStrategy.java @@ -39,11 +39,12 @@ public class ReplaceOneBusinessKeyStrategy implements WriteModelStrategy, Configurable { - private static final ReplaceOptions REPLACE_OPTIONS = new ReplaceOptions().upsert(true); private boolean isPartialId = false; + private boolean isUpsertEnabled = true; @Override public WriteModel createWriteModel(final SinkDocument document) { + ReplaceOptions replaceOptions = new ReplaceOptions().upsert(isUpsertEnabled); BsonDocument vd = document .getValueDoc() @@ -65,12 +66,13 @@ public WriteModel createWriteModel(final SinkDocument document) { if (isPartialId) { businessKey = flattenKeys(businessKey); } - return new ReplaceOneModel<>(businessKey, vd, REPLACE_OPTIONS); + return new ReplaceOneModel<>(businessKey, vd, replaceOptions); } @Override public void configure(final MongoSinkTopicConfig configuration) { IdStrategy idStrategy = configuration.getIdStrategy(); + isUpsertEnabled = configuration.isUpsertEnabled(); isPartialId = idStrategy instanceof PartialKeyStrategy || idStrategy instanceof PartialValueStrategy; } diff --git a/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/ReplaceOneDefaultStrategy.java b/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/ReplaceOneDefaultStrategy.java index 10d5d63f9..65cc4132e 100644 --- a/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/ReplaceOneDefaultStrategy.java +++ b/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/ReplaceOneDefaultStrategy.java @@ -29,14 +29,17 @@ import com.mongodb.client.model.ReplaceOptions; import com.mongodb.client.model.WriteModel; +import com.mongodb.kafka.connect.sink.Configurable; +import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig; import com.mongodb.kafka.connect.sink.converter.SinkDocument; -public class ReplaceOneDefaultStrategy implements WriteModelStrategy { +public class ReplaceOneDefaultStrategy implements WriteModelStrategy, Configurable { - private static final ReplaceOptions REPLACE_OPTIONS = new ReplaceOptions().upsert(true); + private boolean isUpsertEnabled = true; @Override public WriteModel createWriteModel(final SinkDocument document) { + ReplaceOptions replaceOptions = new ReplaceOptions().upsert(isUpsertEnabled); BsonDocument vd = document .getValueDoc() @@ -51,6 +54,11 @@ public WriteModel createWriteModel(final SinkDocument document) { "Could not build the WriteModel,the `_id` field was missing unexpectedly"); } - return new ReplaceOneModel<>(new BsonDocument(ID_FIELD, idValue), vd, REPLACE_OPTIONS); + return new ReplaceOneModel<>(new BsonDocument(ID_FIELD, idValue), vd, replaceOptions); + } + + @Override + public void configure(final MongoSinkTopicConfig configuration) { + isUpsertEnabled = configuration.isUpsertEnabled(); } } diff --git a/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/UpdateOneBusinessKeyTimestampStrategy.java b/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/UpdateOneBusinessKeyTimestampStrategy.java index aa53cb8a3..4e397dc0e 100644 --- a/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/UpdateOneBusinessKeyTimestampStrategy.java +++ b/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/UpdateOneBusinessKeyTimestampStrategy.java @@ -38,13 +38,14 @@ public class UpdateOneBusinessKeyTimestampStrategy implements WriteModelStrategy, Configurable { - private static final UpdateOptions UPDATE_OPTIONS = new UpdateOptions().upsert(true); static final String FIELD_NAME_MODIFIED_TS = "_modifiedTS"; static final String FIELD_NAME_INSERTED_TS = "_insertedTS"; private boolean isPartialId = false; + private boolean isUpsertEnabled = true; @Override public WriteModel createWriteModel(final SinkDocument document) { + UpdateOptions updateOptions = new UpdateOptions().upsert(isUpsertEnabled); BsonDocument vd = document .getValueDoc() @@ -71,12 +72,13 @@ public WriteModel createWriteModel(final SinkDocument document) { businessKey, new BsonDocument("$set", vd.append(FIELD_NAME_MODIFIED_TS, dateTime)) .append("$setOnInsert", new BsonDocument(FIELD_NAME_INSERTED_TS, dateTime)), - UPDATE_OPTIONS); + updateOptions); } @Override public void configure(final MongoSinkTopicConfig configuration) { IdStrategy idStrategy = configuration.getIdStrategy(); + isUpsertEnabled = configuration.isUpsertEnabled(); isPartialId = idStrategy instanceof PartialKeyStrategy || idStrategy instanceof PartialValueStrategy; } diff --git a/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/UpdateOneDefaultStrategy.java b/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/UpdateOneDefaultStrategy.java index bb5b0a3e1..a8075e5ec 100644 --- a/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/UpdateOneDefaultStrategy.java +++ b/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/UpdateOneDefaultStrategy.java @@ -29,14 +29,17 @@ import com.mongodb.client.model.UpdateOptions; import com.mongodb.client.model.WriteModel; +import com.mongodb.kafka.connect.sink.Configurable; +import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig; import com.mongodb.kafka.connect.sink.converter.SinkDocument; -public class UpdateOneDefaultStrategy implements WriteModelStrategy { +public class UpdateOneDefaultStrategy implements WriteModelStrategy, Configurable { - private static final UpdateOptions UPDATE_OPTIONS = new UpdateOptions().upsert(true); + private boolean isUpsertEnabled = true; @Override public WriteModel createWriteModel(final SinkDocument document) { + UpdateOptions updateOptions = new UpdateOptions().upsert(isUpsertEnabled); BsonDocument vd = document .getValueDoc() @@ -52,6 +55,11 @@ public WriteModel createWriteModel(final SinkDocument document) { } vd.remove(ID_FIELD); return new UpdateOneModel<>( - new BsonDocument(ID_FIELD, idValue), new BsonDocument("$set", vd), UPDATE_OPTIONS); + new BsonDocument(ID_FIELD, idValue), new BsonDocument("$set", vd), updateOptions); + } + + @Override + public void configure(final MongoSinkTopicConfig configuration) { + isUpsertEnabled = configuration.isUpsertEnabled(); } } diff --git a/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/UpdateOneTimestampsStrategy.java b/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/UpdateOneTimestampsStrategy.java index 5d5e1e280..73dbc765d 100644 --- a/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/UpdateOneTimestampsStrategy.java +++ b/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/UpdateOneTimestampsStrategy.java @@ -30,15 +30,18 @@ import com.mongodb.client.model.UpdateOptions; import com.mongodb.client.model.WriteModel; +import com.mongodb.kafka.connect.sink.Configurable; +import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig; import com.mongodb.kafka.connect.sink.converter.SinkDocument; -public class UpdateOneTimestampsStrategy implements WriteModelStrategy { - private static final UpdateOptions UPDATE_OPTIONS = new UpdateOptions().upsert(true); +public class UpdateOneTimestampsStrategy implements WriteModelStrategy, Configurable { static final String FIELD_NAME_MODIFIED_TS = "_modifiedTS"; static final String FIELD_NAME_INSERTED_TS = "_insertedTS"; + private boolean isUpsertEnabled = true; @Override public WriteModel createWriteModel(final SinkDocument document) { + UpdateOptions updateOptions = new UpdateOptions().upsert(isUpsertEnabled); BsonDocument vd = document .getValueDoc() @@ -58,6 +61,11 @@ public WriteModel createWriteModel(final SinkDocument document) { new BsonDocument(ID_FIELD, idValue), new BsonDocument("$set", vd.append(FIELD_NAME_MODIFIED_TS, dateTime)) .append("$setOnInsert", new BsonDocument(FIELD_NAME_INSERTED_TS, dateTime)), - UPDATE_OPTIONS); + updateOptions); + } + + @Override + public void configure(final MongoSinkTopicConfig configuration) { + isUpsertEnabled = configuration.isUpsertEnabled(); } } diff --git a/src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java b/src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java index ec48d913b..9d862f765 100644 --- a/src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java +++ b/src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java @@ -23,6 +23,7 @@ import static com.mongodb.kafka.connect.sink.SinkTestHelper.createTopicConfig; import static org.junit.jupiter.api.Assertions.assertAll; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -42,6 +43,7 @@ import com.mongodb.client.model.UpdateOneModel; import com.mongodb.client.model.WriteModel; +import com.mongodb.kafka.connect.sink.Configurable; import com.mongodb.kafka.connect.sink.MongoSinkConfig; import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig; import com.mongodb.kafka.connect.sink.converter.SinkDocument; @@ -81,6 +83,7 @@ class WriteModelStrategyTest { MongoSinkTopicConfig.DOCUMENT_ID_STRATEGY_CONFIG, PartialKeyStrategy.class.getName()); configMap.put( MongoSinkTopicConfig.DOCUMENT_ID_STRATEGY_PARTIAL_KEY_PROJECTION_TYPE_CONFIG, "AllowList"); + configMap.put(MongoSinkTopicConfig.WRITEMODEL_STRATEGY_UPSERT_CONFIG, "true"); MongoSinkTopicConfig partialKeyConfig = new MongoSinkConfig(configMap).getMongoSinkTopicConfig(TEST_TOPIC); @@ -130,6 +133,59 @@ void testDefaultWriteModelStrategy() { defaultWriteModelStrategy.getWriteModelStrategy() instanceof InsertOneDefaultStrategy); } + @Test + @DisplayName("Ensure upsert config is correctly set") + void testWriteModelStrategyUpsertConfig() { + MongoSinkTopicConfig topicConfig; + ReplaceOneModel replaceOneModel; + UpdateOneModel updateOneModel; + WriteModel result; + + Object[] replaceStrategies = {REPLACE_ONE_BUSINESS_KEY_STRATEGY, REPLACE_ONE_DEFAULT_STRATEGY}; + for (Object strategy : replaceStrategies) { + Configurable configurableStrategy = (Configurable) strategy; + WriteModelStrategy writeStrategy = (WriteModelStrategy) strategy; + + topicConfig = + createTopicConfig(MongoSinkTopicConfig.WRITEMODEL_STRATEGY_UPSERT_CONFIG, "false"); + configurableStrategy.configure(topicConfig); + result = writeStrategy.createWriteModel(new SinkDocument(null, VALUE_DOC.clone())); + replaceOneModel = (ReplaceOneModel) result; + assertFalse(replaceOneModel.getReplaceOptions().isUpsert()); + + topicConfig = + createTopicConfig(MongoSinkTopicConfig.WRITEMODEL_STRATEGY_UPSERT_CONFIG, "true"); + configurableStrategy.configure(topicConfig); + result = writeStrategy.createWriteModel(new SinkDocument(null, VALUE_DOC.clone())); + replaceOneModel = (ReplaceOneModel) result; + assertTrue(replaceOneModel.getReplaceOptions().isUpsert()); + } + + Object[] updateStrategies = { + UPDATE_ONE_BUSINESS_KEY_TIMESTAMPS_STRATEGY, + UPDATE_ONE_DEFAULT_STRATEGY, + UPDATE_ONE_TIMESTAMPS_STRATEGY, + }; + for (Object strategy : updateStrategies) { + Configurable configurableStrategy = (Configurable) strategy; + WriteModelStrategy writeStrategy = (WriteModelStrategy) strategy; + + topicConfig = + createTopicConfig(MongoSinkTopicConfig.WRITEMODEL_STRATEGY_UPSERT_CONFIG, "false"); + configurableStrategy.configure(topicConfig); + result = writeStrategy.createWriteModel(new SinkDocument(null, VALUE_DOC.clone())); + updateOneModel = (UpdateOneModel) result; + assertFalse(updateOneModel.getOptions().isUpsert()); + + topicConfig = + createTopicConfig(MongoSinkTopicConfig.WRITEMODEL_STRATEGY_UPSERT_CONFIG, "true"); + configurableStrategy.configure(topicConfig); + result = writeStrategy.createWriteModel(new SinkDocument(null, VALUE_DOC.clone())); + updateOneModel = (UpdateOneModel) result; + assertTrue(updateOneModel.getOptions().isUpsert()); + } + } + @Test @DisplayName( "when sink document is valid for InsertOneDefaultStrategy then correct InsertOneModel")