From e6c82711f924440d20c1330d182e9499c1e01b5b Mon Sep 17 00:00:00 2001 From: weissmedia Date: Mon, 1 Apr 2024 21:09:42 +0200 Subject: [PATCH 1/9] implements: The Kafka record value can be any format. If a value is null then the member is removed from the set (instead of added to the set). --- .../redis/kafka/connect/operation/Sadd.java | 32 +++++++++++++++++++ .../kafka/connect/sink/RedisSinkTask.java | 8 ++++- 2 files changed, 39 insertions(+), 1 deletion(-) create mode 100644 core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/operation/Sadd.java diff --git a/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/operation/Sadd.java b/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/operation/Sadd.java new file mode 100644 index 0000000..e326a5c --- /dev/null +++ b/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/operation/Sadd.java @@ -0,0 +1,32 @@ +package com.redis.kafka.connect.operation; + +import com.redis.spring.batch.writer.operation.AbstractKeyWriteOperation; +import io.lettuce.core.RedisFuture; +import io.lettuce.core.api.async.BaseRedisAsyncCommands; +import io.lettuce.core.api.async.RedisSetAsyncCommands; +import java.util.function.Function; + +public class Sadd extends AbstractKeyWriteOperation { + private Function valueFunction; + private Function conditionFunction; + + public Sadd() { + } + + public void setValueFunction(Function function) { + this.valueFunction = function; + } + + public void setConditionFunction(Function function) { + this.conditionFunction = function; + } + + protected RedisFuture execute(BaseRedisAsyncCommands commands, T item, K key) { + V value = valueFunction.apply(item); + if (conditionFunction.apply(item)) { + return ((RedisSetAsyncCommands) commands).srem(key, value); + } else { + return ((RedisSetAsyncCommands) commands).sadd(key, value); + } + } +} diff --git a/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkTask.java b/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkTask.java index cec8ee1..964eece 100644 --- a/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkTask.java +++ b/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkTask.java @@ -59,7 +59,8 @@ import com.redis.spring.batch.writer.operation.JsonSet; import com.redis.spring.batch.writer.operation.Lpush; import com.redis.spring.batch.writer.operation.Rpush; -import com.redis.spring.batch.writer.operation.Sadd; +//import com.redis.spring.batch.writer.operation.Sadd; +import com.redis.kafka.connect.operation.Sadd; import com.redis.spring.batch.writer.operation.Set; import com.redis.spring.batch.writer.operation.TsAdd; import com.redis.spring.batch.writer.operation.Xadd; @@ -191,6 +192,7 @@ private WriteOperation operation() { Sadd sadd = new Sadd<>(); sadd.setKeyFunction(this::collectionKey); sadd.setValueFunction(this::member); + sadd.setConditionFunction(this::isNullValue); return sadd; case TSADD: TsAdd tsAdd = new TsAdd<>(); @@ -211,6 +213,10 @@ private WriteOperation operation() { } } + private boolean isNullValue(SinkRecord sinkRecord) { + return sinkRecord.value() == null; + } + private byte[] value(SinkRecord sinkRecord) { return bytes("value", sinkRecord.value()); } From 94e93c9dc4a409d9ec74419a312ba71287b6058b Mon Sep 17 00:00:00 2001 From: weissmedia Date: Tue, 2 Apr 2024 20:26:06 +0200 Subject: [PATCH 2/9] implements: If value is null, the key is deleted for JsonSet. --- .../kafka/connect/operation/JsonSet.java | 49 +++++++++++++++++++ .../kafka/connect/sink/RedisSinkTask.java | 4 +- 2 files changed, 52 insertions(+), 1 deletion(-) create mode 100644 core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/operation/JsonSet.java diff --git a/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/operation/JsonSet.java b/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/operation/JsonSet.java new file mode 100644 index 0000000..4869948 --- /dev/null +++ b/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/operation/JsonSet.java @@ -0,0 +1,49 @@ +package com.redis.kafka.connect.operation; + +import com.redis.lettucemod.api.async.RedisJSONAsyncCommands; +import com.redis.spring.batch.writer.operation.AbstractKeyWriteOperation; +import io.lettuce.core.RedisFuture; +import io.lettuce.core.api.async.BaseRedisAsyncCommands; +import io.lettuce.core.api.async.RedisKeyAsyncCommands; + +import java.util.function.Function; + +public class JsonSet extends AbstractKeyWriteOperation { + public static final String ROOT_PATH = "$"; + private Function pathFunction = (t) -> { + return "$"; + }; + private Function valueFunction; + private Function conditionFunction; + + public JsonSet() { + } + + public void setPath(String path) { + this.pathFunction = (t) -> { + return path; + }; + } + + public void setPathFunction(Function path) { + this.pathFunction = path; + } + + public void setValueFunction(Function value) { + this.valueFunction = value; + } + + public void setConditionFunction(Function function) { + this.conditionFunction = function; + } + + protected RedisFuture execute(BaseRedisAsyncCommands commands, T item, K key) { + if (conditionFunction.apply(item)) { + return ((RedisKeyAsyncCommands)commands).del(new Object[]{key}); + } else { + String path = (String)this.pathFunction.apply(item); + V value = this.valueFunction.apply(item); + return ((RedisJSONAsyncCommands)commands).jsonSet(key, path, value); + } + } +} diff --git a/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkTask.java b/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkTask.java index 964eece..1816017 100644 --- a/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkTask.java +++ b/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkTask.java @@ -56,7 +56,8 @@ import com.redis.spring.batch.writer.WriteOperation; import com.redis.spring.batch.writer.operation.Del; import com.redis.spring.batch.writer.operation.Hset; -import com.redis.spring.batch.writer.operation.JsonSet; +//import com.redis.spring.batch.writer.operation.JsonSet; +import com.redis.kafka.connect.operation.JsonSet; import com.redis.spring.batch.writer.operation.Lpush; import com.redis.spring.batch.writer.operation.Rpush; //import com.redis.spring.batch.writer.operation.Sadd; @@ -167,6 +168,7 @@ private WriteOperation operation() { JsonSet jsonSet = new JsonSet<>(); jsonSet.setKeyFunction(this::key); jsonSet.setValueFunction(this::jsonValue); + jsonSet.setConditionFunction(this::isNullValue); return jsonSet; case SET: Set set = new Set<>(); From 068c2a5cdda63b34c04c9603cf2aa1f091d39cfb Mon Sep 17 00:00:00 2001 From: Mathias Weiss - IGS Extern Date: Wed, 10 Jul 2024 17:17:29 +0200 Subject: [PATCH 3/9] feat: Add support and documentation for redis.json.path and redis.json.path.fixed configurations - Implement functionality for dynamic and fixed JSON path handling with JSONMERGE command - Update RedisSinkConfigDef to validate and apply redis.json.path and redis.json.path.fixed - Enhance documentation to explain usage and configuration of redis.json.path and redis.json.path.fixed --- .../kafka/connect/operation/JsonMerge.java | 122 ++++++++++++ .../kafka/connect/sink/RedisSinkConfig.java | 23 ++- .../connect/sink/RedisSinkConfigDef.java | 25 +++ .../kafka/connect/sink/RedisSinkTask.java | 37 ++++ .../connect/AbstractSinkIntegrationTests.java | 184 ++++++++++++++++++ docs/guide/src/docs/asciidoc/sink.adoc | 59 ++++-- 6 files changed, 438 insertions(+), 12 deletions(-) create mode 100644 core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/operation/JsonMerge.java diff --git a/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/operation/JsonMerge.java b/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/operation/JsonMerge.java new file mode 100644 index 0000000..48b738b --- /dev/null +++ b/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/operation/JsonMerge.java @@ -0,0 +1,122 @@ +package com.redis.kafka.connect.operation; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.redis.lettucemod.api.async.RedisJSONAsyncCommands; +import com.redis.spring.batch.writer.operation.AbstractKeyWriteOperation; +import io.lettuce.core.RedisFuture; +import io.lettuce.core.api.async.BaseRedisAsyncCommands; +import io.lettuce.core.api.async.RedisKeyAsyncCommands; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.function.Function; + +public class JsonMerge extends AbstractKeyWriteOperation { + private static final Logger log = LoggerFactory.getLogger(JsonSet.class); + + public static final String ROOT_PATH = "$"; + private static final Function DEFAULT_PATH_FUNCTION = t -> ROOT_PATH; + + private Function pathFunction; + private Function valueFunction; + private Function conditionFunction; + + private final ObjectMapper mapper; + + public JsonMerge() { + // Set default path function + this.pathFunction = (Function) DEFAULT_PATH_FUNCTION; + this.mapper = new ObjectMapper(); + this.mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); + } + + public void setPath(String path) { + this.pathFunction = t -> path; + } + + public void setPathFunction(Function pathFunction) { + this.pathFunction = pathFunction; + } + + public void setValueFunction(Function valueFunction) { + this.valueFunction = valueFunction; + } + + public void setConditionFunction(Function conditionFunction) { + this.conditionFunction = conditionFunction; + } + + protected RedisFuture execute(BaseRedisAsyncCommands commands, T item, K key) { + try { + String path = determinePath(item); + logPath(path); + V value = this.valueFunction.apply(item); + log.info("Value: {}", value); + + if (conditionFunction.apply(item)) { + if (isPathSet()) { + return deleteJsonPath(commands, key, path); + } else { + return deleteKey(commands, key); + } + } +System.out.println("isPathSet() "+isPathSet()); + // Perform JSON operation based on whether path is set + if (isPathSet()) { + + return performJsonMerge(commands, key, path, value); + } else { + return performJsonSet(commands, key, value); + } + } catch (JsonProcessingException e) { + log.error("Error processing JSON", e); + return null; + } catch (Exception e) { + log.error("Error executing Redis command", e); + return null; + } + } + + private RedisFuture deleteKey(BaseRedisAsyncCommands commands, K key) { + return ((RedisKeyAsyncCommands) commands).del(key); + } + + private RedisFuture deleteJsonPath(BaseRedisAsyncCommands commands, K key, String path) { + return ((RedisJSONAsyncCommands) commands).jsonDel(key, path); + } + + private String determinePath(T item) { + return this.pathFunction.apply(item); + } + + private void logPath(String path) { + if (isPathSet()) { + log.info("Path is set to: {}", path); + } else { + log.info("Path is not set, using default: {}", ROOT_PATH); + } + } + + @SuppressWarnings("unchecked") + private RedisFuture performJsonMerge(BaseRedisAsyncCommands commands, K key, String path, V value) throws JsonProcessingException { + // Convert empty JSON object + String emptyJson = mapper.writeValueAsString(new Object()); + byte[] emptyJsonBytes = emptyJson.getBytes(StandardCharsets.UTF_8); + + // Merge empty JSON object first + ((RedisJSONAsyncCommands) commands).jsonMerge(key, ROOT_PATH, (V) emptyJsonBytes); + // Merge actual value + return ((RedisJSONAsyncCommands) commands).jsonMerge(key, path, value); + } + + private RedisFuture performJsonSet(BaseRedisAsyncCommands commands, K key, V value) throws JsonProcessingException { + return ((RedisJSONAsyncCommands) commands).jsonSet(key, ROOT_PATH, value); + } + + private boolean isPathSet() { + return this.pathFunction != DEFAULT_PATH_FUNCTION; + } +} diff --git a/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkConfig.java b/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkConfig.java index 5adfd05..2416206 100644 --- a/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkConfig.java +++ b/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkConfig.java @@ -25,7 +25,7 @@ public class RedisSinkConfig extends RedisConfig { public enum RedisCommand { - HSET, JSONSET, TSADD, SET, XADD, LPUSH, RPUSH, SADD, ZADD, DEL + HSET, JSONSET, JSONMERGE, TSADD, SET, XADD, LPUSH, RPUSH, SADD, ZADD, DEL } public static final RedisSinkConfigDef CONFIG = new RedisSinkConfigDef(); @@ -44,6 +44,10 @@ public enum RedisCommand { private final Duration waitTimeout; + private final String jsonPath; + + private final String fixedJsonPath; + public RedisSinkConfig(Map originals) { super(new RedisSinkConfigDef(), originals); String charsetName = getString(RedisSinkConfigDef.CHARSET_CONFIG).trim(); @@ -54,6 +58,14 @@ public RedisSinkConfig(Map originals) { multiExec = Boolean.TRUE.equals(getBoolean(RedisSinkConfigDef.MULTIEXEC_CONFIG)); waitReplicas = getInt(RedisSinkConfigDef.WAIT_REPLICAS_CONFIG); waitTimeout = Duration.ofMillis(getLong(RedisSinkConfigDef.WAIT_TIMEOUT_CONFIG)); + + if (command == RedisCommand.JSONMERGE) { + jsonPath = getString(RedisSinkConfigDef.JSON_PATH_CONFIG).trim(); + fixedJsonPath = getString(RedisSinkConfigDef.FIXED_JSON_PATH_CONFIG).trim(); + } else { + jsonPath = null; + fixedJsonPath = null; + } } public Charset getCharset() { @@ -84,6 +96,15 @@ public Duration getWaitTimeout() { return waitTimeout; } + public String getJsonPath() { + return jsonPath; + } + + public String getFixedJsonPath() { + return fixedJsonPath; + } + + @Override public int hashCode() { final int prime = 31; diff --git a/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkConfigDef.java b/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkConfigDef.java index ea5b835..de09e63 100644 --- a/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkConfigDef.java +++ b/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkConfigDef.java @@ -59,6 +59,18 @@ public class RedisSinkConfigDef extends RedisConfigDef { public static final RedisCommand COMMAND_DEFAULT = RedisCommand.XADD; + public static final String JSON_PATH_CONFIG = "redis.json.path"; + + public static final String JSON_PATH_DEFAULT = "$"; + + public static final String JSON_PATH_DOC = "The JSON attribute in the header of the Kafka message based on which the JSON path will be dynamically set."; + + public static final String FIXED_JSON_PATH_CONFIG = "redis.json.path.fixed"; + + public static final String FIXED_JSON_PATH_DEFAULT = "$"; + + public static final String FIXED_JSON_PATH_DOC = "The fixed JSON path to set within the value in the header of the Kafka message. This path will be used if the dynamic path is not present."; + public static final String COMMAND_DOC = "Destination data structure: " + String.join(",", Stream.of(RedisCommand.values()).map(RedisCommand::name).toArray(String[]::new)); @@ -83,6 +95,8 @@ private void define() { define(MULTIEXEC_CONFIG, Type.BOOLEAN, MULTIEXEC_DEFAULT, Importance.MEDIUM, MULTIEXEC_DOC); define(WAIT_REPLICAS_CONFIG, Type.INT, WAIT_REPLICAS_DEFAULT, Importance.MEDIUM, WAIT_REPLICAS_DOC); define(WAIT_TIMEOUT_CONFIG, Type.LONG, WAIT_TIMEOUT_DEFAULT, Importance.MEDIUM, WAIT_TIMEOUT_DOC); + define(JSON_PATH_CONFIG, Type.STRING, JSON_PATH_DEFAULT, Importance.MEDIUM, JSON_PATH_DOC); + define(FIXED_JSON_PATH_CONFIG, Type.STRING, FIXED_JSON_PATH_DEFAULT, Importance.MEDIUM, FIXED_JSON_PATH_DOC); } @Override @@ -92,6 +106,17 @@ public Map validateAll(Map props) { return results; } RedisCommand command = redisCommand(props); + + // Ensure JSON_PATH_CONFIG and FIXED_JSON_PATH_CONFIG are only set if the command is JSONMERGE + if (command != RedisCommand.JSONMERGE) { + if (props.containsKey(JSON_PATH_CONFIG)) { + results.get(JSON_PATH_CONFIG).addErrorMessage("The JSON path configuration is not allowed unless the command is JSONMERGE."); + } + if (props.containsKey(FIXED_JSON_PATH_CONFIG)) { + results.get(FIXED_JSON_PATH_CONFIG).addErrorMessage("The fixed JSON path configuration is not allowed unless the command is JSONMERGE."); + } + } + String multiexec = props.getOrDefault(MULTIEXEC_CONFIG, MULTIEXEC_DEFAULT).trim(); if (multiexec.equalsIgnoreCase("true") && !MULTI_EXEC_COMMANDS.contains(command)) { String supportedTypes = String.join(", ", MULTI_EXEC_COMMANDS.stream().map(Enum::name).toArray(String[]::new)); diff --git a/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkTask.java b/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkTask.java index 1816017..d4520a9 100644 --- a/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkTask.java +++ b/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkTask.java @@ -34,6 +34,8 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.errors.RetriableException; +import org.apache.kafka.connect.header.Header; +import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; @@ -58,6 +60,7 @@ import com.redis.spring.batch.writer.operation.Hset; //import com.redis.spring.batch.writer.operation.JsonSet; import com.redis.kafka.connect.operation.JsonSet; +import com.redis.kafka.connect.operation.JsonMerge; import com.redis.spring.batch.writer.operation.Lpush; import com.redis.spring.batch.writer.operation.Rpush; //import com.redis.spring.batch.writer.operation.Sadd; @@ -170,6 +173,13 @@ private WriteOperation operation() { jsonSet.setValueFunction(this::jsonValue); jsonSet.setConditionFunction(this::isNullValue); return jsonSet; + case JSONMERGE: + JsonMerge jsonMerge = new JsonMerge<>(); + jsonMerge.setKeyFunction(this::key); + jsonMerge.setValueFunction(this::jsonValue); + jsonMerge.setConditionFunction(this::isNullValue); + jsonMerge.setPathFunction(this::determineJsonPath); + return jsonMerge; case SET: Set set = new Set<>(); set.setKeyFunction(this::key); @@ -315,6 +325,33 @@ private Map map(SinkRecord sinkRecord) { throw new ConnectException("Unsupported source value type: " + sinkRecord.valueSchema().type().name()); } + private String determineJsonPath(SinkRecord record) { + try { + Headers headers = record.headers(); + String jsonPath = getJsonPathFromHeader(headers, config.getJsonPath()); + + if (jsonPath == null) { + return config.getFixedJsonPath(); + } + + return jsonPath; + } catch (Exception e) { + log.error("Error determining JSON path: ", e); + return config.getFixedJsonPath(); + } + } + + private String getJsonPathFromHeader(Headers headers, String path) { + for (Header header : headers) { + System.out.println("path "+path); + System.out.println("header "+header); + if (header.key().equals(path)) { + return header.value().toString(); + } + } + return null; + } + @Override public void stop() { if (writer != null) { diff --git a/core/redis-kafka-connect/src/test/java/com/redis/kafka/connect/AbstractSinkIntegrationTests.java b/core/redis-kafka-connect/src/test/java/com/redis/kafka/connect/AbstractSinkIntegrationTests.java index 09403c8..baab354 100644 --- a/core/redis-kafka-connect/src/test/java/com/redis/kafka/connect/AbstractSinkIntegrationTests.java +++ b/core/redis-kafka-connect/src/test/java/com/redis/kafka/connect/AbstractSinkIntegrationTests.java @@ -23,6 +23,8 @@ import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTaskContext; import org.junit.jupiter.api.AfterEach; @@ -68,6 +70,18 @@ public static SinkRecord write(String topic, SchemaAndValue key, SchemaAndValue TimestampType.CREATE_TIME); } + public static SinkRecord writeWithHeaders(String topic, SchemaAndValue key, SchemaAndValue value, Headers headers) { + Preconditions.notNull(topic, "topic cannot be null"); + Preconditions.notNull(key, "key cannot be null."); + Preconditions.notNull(key.value(), "key cannot be null."); + Preconditions.notNull(value, "value cannot be null."); + Preconditions.notNull(value.value(), "value cannot be null."); + Preconditions.notNull(headers, "headers cannot be null."); + + return new SinkRecord(topic, PARTITION, key.schema(), key.value(), value.schema(), value.value(), OFFSET, TIMESTAMP, + TimestampType.CREATE_TIME, headers); + } + public static SinkRecord delete(String topic, SchemaAndValue key) { Preconditions.notNull(topic, "topic cannot be null"); if (null == key) { @@ -323,6 +337,176 @@ void putJSON() throws JsonProcessingException { } } + @Test + void mergeJSON() throws JsonProcessingException { + String topic = "mergeJSON"; + List persons = new ArrayList<>(); + Person person1 = new Person(); + person1.setId(1); + person1.setName("Bodysnitch Canderbunt"); + person1.setHobbies(new HashSet<>(Arrays.asList("Fishing", "Singing"))); + Address address1 = new Address(); + address1.setCity("New York"); + address1.setZip("10013"); + address1.setState("NY"); + address1.setStreet("150 Mott St"); + person1.setAddress(address1); + persons.add(person1); + Person person2 = new Person(); + person2.setId(2); + person2.setName("Buffalo Custardbath"); + person2.setHobbies(new HashSet<>(Arrays.asList("Surfing", "Piano"))); + Address address2 = new Address(); + address2.setCity("Los Angeles"); + address2.setZip("90001"); + address2.setState("CA"); + address2.setStreet("123 Sunset Blvd"); + person2.setAddress(address2); + persons.add(person2); + Person person3 = new Person(); + person3.setId(3); + person3.setName("Bumblesnuff Crimpysnitch"); + person3.setHobbies(new HashSet<>(Arrays.asList("Skiing", "Drums"))); + Address address3 = new Address(); + address3.setCity("Chicago"); + address3.setZip("60603"); + address3.setState("IL"); + address3.setStreet("100 S State St"); + person3.setAddress(address3); + persons.add(person3); + List records = new ArrayList<>(); + ObjectMapper mapper = new ObjectMapper(); + + for (Person person : persons) { + String json = mapper.writeValueAsString(person); + + SchemaAndValue keySchemaAndValue = new SchemaAndValue(Schema.STRING_SCHEMA, person.getId()); + SchemaAndValue valueSchemaAndValue = new SchemaAndValue(Schema.STRING_SCHEMA, json); + + records.add(write(topic, keySchemaAndValue, valueSchemaAndValue)); + } + put(topic, RedisCommand.JSONMERGE, records); + for (Person person : persons) { + String json = connection.sync().jsonGet(topic + ":" + person.getId()); + assertEquals(person, mapper.readValue(json, Person.class)); + } + } + + @Test + void mergeJSONWithPath() throws JsonProcessingException { + String topic = "mergeJSONWithPath"; + String jsonPath = "person"; + List persons = new ArrayList<>(); + Person person1 = new Person(); + person1.setId(1); + person1.setName("Bodysnitch Canderbunt"); + person1.setHobbies(new HashSet<>(Arrays.asList("Fishing", "Singing"))); + Address address1 = new Address(); + address1.setCity("New York"); + address1.setZip("10013"); + address1.setState("NY"); + address1.setStreet("150 Mott St"); + person1.setAddress(address1); + persons.add(person1); + Person person2 = new Person(); + person2.setId(2); + person2.setName("Buffalo Custardbath"); + person2.setHobbies(new HashSet<>(Arrays.asList("Surfing", "Piano"))); + Address address2 = new Address(); + address2.setCity("Los Angeles"); + address2.setZip("90001"); + address2.setState("CA"); + address2.setStreet("123 Sunset Blvd"); + person2.setAddress(address2); + persons.add(person2); + Person person3 = new Person(); + person3.setId(3); + person3.setName("Bumblesnuff Crimpysnitch"); + person3.setHobbies(new HashSet<>(Arrays.asList("Skiing", "Drums"))); + Address address3 = new Address(); + address3.setCity("Chicago"); + address3.setZip("60603"); + address3.setState("IL"); + address3.setStreet("100 S State St"); + person3.setAddress(address3); + persons.add(person3); + List records = new ArrayList<>(); + ObjectMapper mapper = new ObjectMapper(); + + for (Person person : persons) { + String json = mapper.writeValueAsString(person); + + Headers headers = new ConnectHeaders(); + headers.addString("source", jsonPath); + + SchemaAndValue keySchemaAndValue = new SchemaAndValue(Schema.STRING_SCHEMA, person.getId()); + SchemaAndValue valueSchemaAndValue = new SchemaAndValue(Schema.STRING_SCHEMA, json); + + records.add(writeWithHeaders(topic, keySchemaAndValue, valueSchemaAndValue, headers)); + } + put(topic, RedisCommand.JSONMERGE, records,RedisSinkConfigDef.JSON_PATH_CONFIG, "source"); + for (Person person : persons) { + String json = connection.sync().jsonGet(topic + ":" + person.getId(), jsonPath); + assertEquals(person, mapper.readValue(json, Person.class)); + } + } + + @Test + void mergeJSONWithFixedPath() throws JsonProcessingException { + String topic = "mergeJSONWithFixedPath"; + String jsonPath = "person-fixed"; + List persons = new ArrayList<>(); + Person person1 = new Person(); + person1.setId(1); + person1.setName("Bodysnitch Canderbunt"); + person1.setHobbies(new HashSet<>(Arrays.asList("Fishing", "Singing"))); + Address address1 = new Address(); + address1.setCity("New York"); + address1.setZip("10013"); + address1.setState("NY"); + address1.setStreet("150 Mott St"); + person1.setAddress(address1); + persons.add(person1); + Person person2 = new Person(); + person2.setId(2); + person2.setName("Buffalo Custardbath"); + person2.setHobbies(new HashSet<>(Arrays.asList("Surfing", "Piano"))); + Address address2 = new Address(); + address2.setCity("Los Angeles"); + address2.setZip("90001"); + address2.setState("CA"); + address2.setStreet("123 Sunset Blvd"); + person2.setAddress(address2); + persons.add(person2); + Person person3 = new Person(); + person3.setId(3); + person3.setName("Bumblesnuff Crimpysnitch"); + person3.setHobbies(new HashSet<>(Arrays.asList("Skiing", "Drums"))); + Address address3 = new Address(); + address3.setCity("Chicago"); + address3.setZip("60603"); + address3.setState("IL"); + address3.setStreet("100 S State St"); + person3.setAddress(address3); + persons.add(person3); + List records = new ArrayList<>(); + ObjectMapper mapper = new ObjectMapper(); + + for (Person person : persons) { + String json = mapper.writeValueAsString(person); + + SchemaAndValue keySchemaAndValue = new SchemaAndValue(Schema.STRING_SCHEMA, person.getId()); + SchemaAndValue valueSchemaAndValue = new SchemaAndValue(Schema.STRING_SCHEMA, json); + + records.add(write(topic, keySchemaAndValue, valueSchemaAndValue)); + } + put(topic, RedisCommand.JSONMERGE, records,RedisSinkConfigDef.FIXED_JSON_PATH_CONFIG, jsonPath); + for (Person person : persons) { + String json = connection.sync().jsonGet(topic + ":" + person.getId(), jsonPath); + assertEquals(person, mapper.readValue(json, Person.class)); + } + } + @Test void putTimeSeries() { String topic = "putTimeSeries"; diff --git a/docs/guide/src/docs/asciidoc/sink.adoc b/docs/guide/src/docs/asciidoc/sink.adoc index 21c7d65..c746a17 100644 --- a/docs/guide/src/docs/asciidoc/sink.adoc +++ b/docs/guide/src/docs/asciidoc/sink.adoc @@ -74,7 +74,7 @@ value.converter = <2> If value is null the key is deleted. [[_sink_json]] -=== JSON +=== JSON (JSONSET) Use the following properties to write Kafka records as RedisJSON documents: [source,properties] @@ -87,6 +87,22 @@ value.converter = <2> <2> <<_sink_value_string,String>>, <<_sink_value_bytes,bytes>>, or <<_sink_value_avro,Avro>>. If value is null the key is deleted. +[source,properties] +---- +redis.command = JSONMERGE +key.converter = <1> +value.converter = <2> +redis.json.path =
<3> +redis.json.path.fixed = <4> +---- +<1> <<_sink_key_string,String>>, <<_sink_key_bytes,bytes>>, or <<_sink_value_avro,Avro>> +<2> <<_sink_value_string,String>>, <<_sink_value_bytes,bytes>>, or <<_sink_value_avro,Avro>>. +<3> <<_sink_header_json_path,Header JSON Path>> +<4> <<_sink_fixed_json_path,Fixed JSON Path>> +If value is null the key is deleted. ++ +The `JSONMERGE` command has the same functionality as `JSONSET` but with the added ability to specify a JSON path. + [[_sink_stream]] === Stream Use the following properties to store Kafka records as Redis stream messages: @@ -260,21 +276,42 @@ value.converter.schemas.enable = <1> ---- <1> Set to `true` if the JSON record structure has an attached schema +[[_sink_header_json_path]] +==== Header JSON Path +The JSON attribute in the header of the Kafka message based on which the JSON path will be dynamically set. + +[source,properties] +---- +redis.json.path =
+---- + +[[_sink_fixed_json_path]] +==== Fixed JSON Path +The fixed JSON path to set within the value in the header of the Kafka message. This path will be used if the dynamic path is not present. + +[source,properties] +---- +redis.json.path.fixed = +---- + [[_sink_config]] == Configuration [source,properties] ---- -connector.class = com.redis.kafka.connect.RedisSinkConnector -topics = <1> -redis.uri = <2> -redis.command = <3> -key.converter = <4> -value.converter = <5> +connector.class = com.redis.kafka.connect.RedisSinkConnector +topics = <1> +redis.uri = <2> +redis.command = <3> +redis.json.path =
<4> +redis.json.path.fixed = <5> +key.converter = <6> +value.converter = <7> ---- -<1> Kafka topics to read messsages from. +<1> Kafka topics to read messages from. <2> <<_sink_redis_client,Redis URI>>. <3> <<_sink_redis_command,Redis command>>. -<4> <<_sink_key,Key converter>>. -<5> <<_sink_value,Value converter>>. - +<4> <<_sink_header_json_path,Header JSON Path>>. +<5> <<_sink_fixed_json_path,Fixed JSON Path>>. +<6> <<_sink_key,Key converter>>. +<7> <<_sink_value,Value converter>>. From 520405f34a8033ba04e27570af332306d5a54501 Mon Sep 17 00:00:00 2001 From: Mathias Weiss - IGS Extern Date: Wed, 10 Jul 2024 17:32:06 +0200 Subject: [PATCH 4/9] doc: fix json title --- docs/guide/src/docs/asciidoc/sink.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/guide/src/docs/asciidoc/sink.adoc b/docs/guide/src/docs/asciidoc/sink.adoc index c746a17..8725319 100644 --- a/docs/guide/src/docs/asciidoc/sink.adoc +++ b/docs/guide/src/docs/asciidoc/sink.adoc @@ -74,7 +74,7 @@ value.converter = <2> If value is null the key is deleted. [[_sink_json]] -=== JSON (JSONSET) +=== JSON Use the following properties to write Kafka records as RedisJSON documents: [source,properties] From a6e89426b78a9e0d3c6014c662a0ec91d2bc3958 Mon Sep 17 00:00:00 2001 From: Mathias Weiss - IGS Extern Date: Wed, 10 Jul 2024 17:34:26 +0200 Subject: [PATCH 5/9] doc: improved configuration section --- docs/guide/src/docs/asciidoc/sink.adoc | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/docs/guide/src/docs/asciidoc/sink.adoc b/docs/guide/src/docs/asciidoc/sink.adoc index 8725319..a99a63e 100644 --- a/docs/guide/src/docs/asciidoc/sink.adoc +++ b/docs/guide/src/docs/asciidoc/sink.adoc @@ -299,14 +299,14 @@ redis.json.path.fixed = [source,properties] ---- -connector.class = com.redis.kafka.connect.RedisSinkConnector -topics = <1> -redis.uri = <2> -redis.command = <3> -redis.json.path =
<4> -redis.json.path.fixed = <5> -key.converter = <6> -value.converter = <7> +connector.class = com.redis.kafka.connect.RedisSinkConnector +topics = <1> +redis.uri = <2> +redis.command = <3> +redis.json.path =
<4> +redis.json.path.fixed = <5> +key.converter = <6> +value.converter = <7> ---- <1> Kafka topics to read messages from. <2> <<_sink_redis_client,Redis URI>>. From a7d06bbef749d0c55559a7844424a5c3b744f3c5 Mon Sep 17 00:00:00 2001 From: Mathias Weiss - IGS Extern Date: Wed, 10 Jul 2024 17:45:07 +0200 Subject: [PATCH 6/9] clarify JSON path behavior in documentation: delete specific JSON object when path is set, delete entire key when path is not set --- docs/guide/src/docs/asciidoc/sink.adoc | 28 +++++++++++++------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/docs/guide/src/docs/asciidoc/sink.adoc b/docs/guide/src/docs/asciidoc/sink.adoc index a99a63e..9f05d5c 100644 --- a/docs/guide/src/docs/asciidoc/sink.adoc +++ b/docs/guide/src/docs/asciidoc/sink.adoc @@ -85,21 +85,21 @@ value.converter = <2> ---- <1> <<_sink_key_string,String>>, <<_sink_key_bytes,bytes>>, or <<_sink_value_avro,Avro>> <2> <<_sink_value_string,String>>, <<_sink_value_bytes,bytes>>, or <<_sink_value_avro,Avro>>. -If value is null the key is deleted. +If value is null, the entire key will be deleted. [source,properties] ---- -redis.command = JSONMERGE -key.converter = <1> -value.converter = <2> -redis.json.path =
<3> +redis.command = JSONMERGE +key.converter = <1> +value.converter = <2> +redis.json.path =
<3> redis.json.path.fixed = <4> ---- <1> <<_sink_key_string,String>>, <<_sink_key_bytes,bytes>>, or <<_sink_value_avro,Avro>> <2> <<_sink_value_string,String>>, <<_sink_value_bytes,bytes>>, or <<_sink_value_avro,Avro>>. <3> <<_sink_header_json_path,Header JSON Path>> <4> <<_sink_fixed_json_path,Fixed JSON Path>> -If value is null the key is deleted. +If value is null, the JSON object at the specified path will be deleted. If no path is set, the entire key will be deleted. + The `JSONMERGE` command has the same functionality as `JSONSET` but with the added ability to specify a JSON path. @@ -299,14 +299,14 @@ redis.json.path.fixed = [source,properties] ---- -connector.class = com.redis.kafka.connect.RedisSinkConnector -topics = <1> -redis.uri = <2> -redis.command = <3> -redis.json.path =
<4> -redis.json.path.fixed = <5> -key.converter = <6> -value.converter = <7> +connector.class = com.redis.kafka.connect.RedisSinkConnector +topics = <1> +redis.uri = <2> +redis.command = <3> +redis.json.path =
<4> +redis.json.path.fixed = <5> +key.converter = <6> +value.converter = <7> ---- <1> Kafka topics to read messages from. <2> <<_sink_redis_client,Redis URI>>. From 4bb6b01bdd03be79dcf1fb6adc97d53db550ee79 Mon Sep 17 00:00:00 2001 From: Mathias Weiss - IGS Extern Date: Wed, 10 Jul 2024 17:48:37 +0200 Subject: [PATCH 7/9] updated JSON path delete behavior in documentation --- docs/guide/src/docs/asciidoc/sink.adoc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/guide/src/docs/asciidoc/sink.adoc b/docs/guide/src/docs/asciidoc/sink.adoc index 9f05d5c..2f945ed 100644 --- a/docs/guide/src/docs/asciidoc/sink.adoc +++ b/docs/guide/src/docs/asciidoc/sink.adoc @@ -96,11 +96,11 @@ redis.json.path =
<3> redis.json.path.fixed = <4> ---- <1> <<_sink_key_string,String>>, <<_sink_key_bytes,bytes>>, or <<_sink_value_avro,Avro>> -<2> <<_sink_value_string,String>>, <<_sink_value_bytes,bytes>>, or <<_sink_value_avro,Avro>>. +<2> <<_sink_value_string,String>>, <<_sink_value_bytes,bytes>>, or <<_sink_value_avro,Avro>>. If value is null, the JSON object at the specified path will be deleted. If no path is set, the entire key will be deleted. <3> <<_sink_header_json_path,Header JSON Path>> <4> <<_sink_fixed_json_path,Fixed JSON Path>> -If value is null, the JSON object at the specified path will be deleted. If no path is set, the entire key will be deleted. -+ + + The `JSONMERGE` command has the same functionality as `JSONSET` but with the added ability to specify a JSON path. [[_sink_stream]] From bcc118071ffec7eb77799c24b77c38b6f0ed4234 Mon Sep 17 00:00:00 2001 From: Mathias Weiss - IGS Extern Date: Wed, 10 Jul 2024 17:50:52 +0200 Subject: [PATCH 8/9] updated JSON path delete behavior in documentation --- docs/guide/src/docs/asciidoc/sink.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/guide/src/docs/asciidoc/sink.adoc b/docs/guide/src/docs/asciidoc/sink.adoc index 2f945ed..b2ef4a8 100644 --- a/docs/guide/src/docs/asciidoc/sink.adoc +++ b/docs/guide/src/docs/asciidoc/sink.adoc @@ -96,7 +96,7 @@ redis.json.path =
<3> redis.json.path.fixed = <4> ---- <1> <<_sink_key_string,String>>, <<_sink_key_bytes,bytes>>, or <<_sink_value_avro,Avro>> -<2> <<_sink_value_string,String>>, <<_sink_value_bytes,bytes>>, or <<_sink_value_avro,Avro>>. If value is null, the JSON object at the specified path will be deleted. If no path is set, the entire key will be deleted. +<2> <<_sink_value_string,String>>, <<_sink_value_bytes,bytes>>, or <<_sink_value_avro,Avro>>. If value is null, the entire key will be deleted. If a JSON path is set, only the JSON object at the specified path will be deleted. <3> <<_sink_header_json_path,Header JSON Path>> <4> <<_sink_fixed_json_path,Fixed JSON Path>> From 8d17d6695f93d73dab17a1a97421f4bb4998dcc4 Mon Sep 17 00:00:00 2001 From: Mathias Weiss - IGS Extern Date: Wed, 10 Jul 2024 18:00:59 +0200 Subject: [PATCH 9/9] chore: Bump version to 0.10.0-beta --- VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/VERSION b/VERSION index f374f66..d1d2c33 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.9.1 +0.10.0-beta