Skip to content

Implement Remove Functionality for SADD and JsonSet Operation #37

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.9.1
0.10.0-beta
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package com.redis.kafka.connect.operation;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.redis.lettucemod.api.async.RedisJSONAsyncCommands;
import com.redis.spring.batch.writer.operation.AbstractKeyWriteOperation;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.async.BaseRedisAsyncCommands;
import io.lettuce.core.api.async.RedisKeyAsyncCommands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.function.Function;

public class JsonMerge<K, V, T> extends AbstractKeyWriteOperation<K, V, T> {
private static final Logger log = LoggerFactory.getLogger(JsonSet.class);

public static final String ROOT_PATH = "$";
private static final Function<Object, String> DEFAULT_PATH_FUNCTION = t -> ROOT_PATH;

private Function<T, String> pathFunction;
private Function<T, V> valueFunction;
private Function<T, Boolean> conditionFunction;

private final ObjectMapper mapper;

public JsonMerge() {
// Set default path function
this.pathFunction = (Function<T, String>) DEFAULT_PATH_FUNCTION;
this.mapper = new ObjectMapper();
this.mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
}

public void setPath(String path) {
this.pathFunction = t -> path;
}

public void setPathFunction(Function<T, String> pathFunction) {
this.pathFunction = pathFunction;
}

public void setValueFunction(Function<T, V> valueFunction) {
this.valueFunction = valueFunction;
}

public void setConditionFunction(Function<T, Boolean> conditionFunction) {
this.conditionFunction = conditionFunction;
}

protected RedisFuture<String> execute(BaseRedisAsyncCommands<K, V> commands, T item, K key) {
try {
String path = determinePath(item);
logPath(path);
V value = this.valueFunction.apply(item);
log.info("Value: {}", value);

if (conditionFunction.apply(item)) {
if (isPathSet()) {
return deleteJsonPath(commands, key, path);
} else {
return deleteKey(commands, key);
}
}
System.out.println("isPathSet() "+isPathSet());
// Perform JSON operation based on whether path is set
if (isPathSet()) {

return performJsonMerge(commands, key, path, value);
} else {
return performJsonSet(commands, key, value);
}
} catch (JsonProcessingException e) {
log.error("Error processing JSON", e);
return null;
} catch (Exception e) {
log.error("Error executing Redis command", e);
return null;
}
}

private RedisFuture<String> deleteKey(BaseRedisAsyncCommands<K, V> commands, K key) {
return ((RedisKeyAsyncCommands) commands).del(key);
}

private RedisFuture<String> deleteJsonPath(BaseRedisAsyncCommands<K, V> commands, K key, String path) {
return ((RedisJSONAsyncCommands) commands).jsonDel(key, path);
}

private String determinePath(T item) {
return this.pathFunction.apply(item);
}

private void logPath(String path) {
if (isPathSet()) {
log.info("Path is set to: {}", path);
} else {
log.info("Path is not set, using default: {}", ROOT_PATH);
}
}

@SuppressWarnings("unchecked")
private RedisFuture<String> performJsonMerge(BaseRedisAsyncCommands<K, V> commands, K key, String path, V value) throws JsonProcessingException {
// Convert empty JSON object
String emptyJson = mapper.writeValueAsString(new Object());
byte[] emptyJsonBytes = emptyJson.getBytes(StandardCharsets.UTF_8);

// Merge empty JSON object first
((RedisJSONAsyncCommands<K, V>) commands).jsonMerge(key, ROOT_PATH, (V) emptyJsonBytes);
// Merge actual value
return ((RedisJSONAsyncCommands<K, V>) commands).jsonMerge(key, path, value);
}

private RedisFuture<String> performJsonSet(BaseRedisAsyncCommands<K, V> commands, K key, V value) throws JsonProcessingException {
return ((RedisJSONAsyncCommands<K, V>) commands).jsonSet(key, ROOT_PATH, value);
}

private boolean isPathSet() {
return this.pathFunction != DEFAULT_PATH_FUNCTION;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.redis.kafka.connect.operation;

import com.redis.lettucemod.api.async.RedisJSONAsyncCommands;
import com.redis.spring.batch.writer.operation.AbstractKeyWriteOperation;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.async.BaseRedisAsyncCommands;
import io.lettuce.core.api.async.RedisKeyAsyncCommands;

import java.util.function.Function;

public class JsonSet<K, V, T> extends AbstractKeyWriteOperation<K, V, T> {
public static final String ROOT_PATH = "$";
private Function<T, String> pathFunction = (t) -> {
return "$";
};
private Function<T, V> valueFunction;
private Function<T, Boolean> conditionFunction;

public JsonSet() {
}

public void setPath(String path) {
this.pathFunction = (t) -> {
return path;
};
}

public void setPathFunction(Function<T, String> path) {
this.pathFunction = path;
}

public void setValueFunction(Function<T, V> value) {
this.valueFunction = value;
}

public void setConditionFunction(Function<T, Boolean> function) {
this.conditionFunction = function;
}

protected RedisFuture<String> execute(BaseRedisAsyncCommands<K, V> commands, T item, K key) {
if (conditionFunction.apply(item)) {
return ((RedisKeyAsyncCommands)commands).del(new Object[]{key});
} else {
String path = (String)this.pathFunction.apply(item);
V value = this.valueFunction.apply(item);
return ((RedisJSONAsyncCommands)commands).jsonSet(key, path, value);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.redis.kafka.connect.operation;

import com.redis.spring.batch.writer.operation.AbstractKeyWriteOperation;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.async.BaseRedisAsyncCommands;
import io.lettuce.core.api.async.RedisSetAsyncCommands;
import java.util.function.Function;

public class Sadd<K, V, T> extends AbstractKeyWriteOperation<K, V, T> {
private Function<T, V> valueFunction;
private Function<T, Boolean> conditionFunction;

public Sadd() {
}

public void setValueFunction(Function<T, V> function) {
this.valueFunction = function;
}

public void setConditionFunction(Function<T, Boolean> function) {
this.conditionFunction = function;
}

protected RedisFuture<Long> execute(BaseRedisAsyncCommands<K, V> commands, T item, K key) {
V value = valueFunction.apply(item);
if (conditionFunction.apply(item)) {
return ((RedisSetAsyncCommands<K, V>) commands).srem(key, value);
} else {
return ((RedisSetAsyncCommands<K, V>) commands).sadd(key, value);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
public class RedisSinkConfig extends RedisConfig {

public enum RedisCommand {
HSET, JSONSET, TSADD, SET, XADD, LPUSH, RPUSH, SADD, ZADD, DEL
HSET, JSONSET, JSONMERGE, TSADD, SET, XADD, LPUSH, RPUSH, SADD, ZADD, DEL
}

public static final RedisSinkConfigDef CONFIG = new RedisSinkConfigDef();
Expand All @@ -44,6 +44,10 @@ public enum RedisCommand {

private final Duration waitTimeout;

private final String jsonPath;

private final String fixedJsonPath;

public RedisSinkConfig(Map<?, ?> originals) {
super(new RedisSinkConfigDef(), originals);
String charsetName = getString(RedisSinkConfigDef.CHARSET_CONFIG).trim();
Expand All @@ -54,6 +58,14 @@ public RedisSinkConfig(Map<?, ?> originals) {
multiExec = Boolean.TRUE.equals(getBoolean(RedisSinkConfigDef.MULTIEXEC_CONFIG));
waitReplicas = getInt(RedisSinkConfigDef.WAIT_REPLICAS_CONFIG);
waitTimeout = Duration.ofMillis(getLong(RedisSinkConfigDef.WAIT_TIMEOUT_CONFIG));

if (command == RedisCommand.JSONMERGE) {
jsonPath = getString(RedisSinkConfigDef.JSON_PATH_CONFIG).trim();
fixedJsonPath = getString(RedisSinkConfigDef.FIXED_JSON_PATH_CONFIG).trim();
} else {
jsonPath = null;
fixedJsonPath = null;
}
}

public Charset getCharset() {
Expand Down Expand Up @@ -84,6 +96,15 @@ public Duration getWaitTimeout() {
return waitTimeout;
}

public String getJsonPath() {
return jsonPath;
}

public String getFixedJsonPath() {
return fixedJsonPath;
}


@Override
public int hashCode() {
final int prime = 31;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@ public class RedisSinkConfigDef extends RedisConfigDef {

public static final RedisCommand COMMAND_DEFAULT = RedisCommand.XADD;

public static final String JSON_PATH_CONFIG = "redis.json.path";

public static final String JSON_PATH_DEFAULT = "$";

public static final String JSON_PATH_DOC = "The JSON attribute in the header of the Kafka message based on which the JSON path will be dynamically set.";

public static final String FIXED_JSON_PATH_CONFIG = "redis.json.path.fixed";

public static final String FIXED_JSON_PATH_DEFAULT = "$";

public static final String FIXED_JSON_PATH_DOC = "The fixed JSON path to set within the value in the header of the Kafka message. This path will be used if the dynamic path is not present.";

public static final String COMMAND_DOC = "Destination data structure: "
+ String.join(",", Stream.of(RedisCommand.values()).map(RedisCommand::name).toArray(String[]::new));

Expand All @@ -83,6 +95,8 @@ private void define() {
define(MULTIEXEC_CONFIG, Type.BOOLEAN, MULTIEXEC_DEFAULT, Importance.MEDIUM, MULTIEXEC_DOC);
define(WAIT_REPLICAS_CONFIG, Type.INT, WAIT_REPLICAS_DEFAULT, Importance.MEDIUM, WAIT_REPLICAS_DOC);
define(WAIT_TIMEOUT_CONFIG, Type.LONG, WAIT_TIMEOUT_DEFAULT, Importance.MEDIUM, WAIT_TIMEOUT_DOC);
define(JSON_PATH_CONFIG, Type.STRING, JSON_PATH_DEFAULT, Importance.MEDIUM, JSON_PATH_DOC);
define(FIXED_JSON_PATH_CONFIG, Type.STRING, FIXED_JSON_PATH_DEFAULT, Importance.MEDIUM, FIXED_JSON_PATH_DOC);
}

@Override
Expand All @@ -92,6 +106,17 @@ public Map<String, ConfigValue> validateAll(Map<String, String> props) {
return results;
}
RedisCommand command = redisCommand(props);

// Ensure JSON_PATH_CONFIG and FIXED_JSON_PATH_CONFIG are only set if the command is JSONMERGE
if (command != RedisCommand.JSONMERGE) {
if (props.containsKey(JSON_PATH_CONFIG)) {
results.get(JSON_PATH_CONFIG).addErrorMessage("The JSON path configuration is not allowed unless the command is JSONMERGE.");
}
if (props.containsKey(FIXED_JSON_PATH_CONFIG)) {
results.get(FIXED_JSON_PATH_CONFIG).addErrorMessage("The fixed JSON path configuration is not allowed unless the command is JSONMERGE.");
}
}

String multiexec = props.getOrDefault(MULTIEXEC_CONFIG, MULTIEXEC_DEFAULT).trim();
if (multiexec.equalsIgnoreCase("true") && !MULTI_EXEC_COMMANDS.contains(command)) {
String supportedTypes = String.join(", ", MULTI_EXEC_COMMANDS.stream().map(Enum::name).toArray(String[]::new));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
Expand All @@ -56,10 +58,13 @@
import com.redis.spring.batch.writer.WriteOperation;
import com.redis.spring.batch.writer.operation.Del;
import com.redis.spring.batch.writer.operation.Hset;
import com.redis.spring.batch.writer.operation.JsonSet;
//import com.redis.spring.batch.writer.operation.JsonSet;
import com.redis.kafka.connect.operation.JsonSet;
import com.redis.kafka.connect.operation.JsonMerge;
import com.redis.spring.batch.writer.operation.Lpush;
import com.redis.spring.batch.writer.operation.Rpush;
import com.redis.spring.batch.writer.operation.Sadd;
//import com.redis.spring.batch.writer.operation.Sadd;
import com.redis.kafka.connect.operation.Sadd;
import com.redis.spring.batch.writer.operation.Set;
import com.redis.spring.batch.writer.operation.TsAdd;
import com.redis.spring.batch.writer.operation.Xadd;
Expand Down Expand Up @@ -166,7 +171,15 @@ private WriteOperation<byte[], byte[], SinkRecord> operation() {
JsonSet<byte[], byte[], SinkRecord> jsonSet = new JsonSet<>();
jsonSet.setKeyFunction(this::key);
jsonSet.setValueFunction(this::jsonValue);
jsonSet.setConditionFunction(this::isNullValue);
return jsonSet;
case JSONMERGE:
JsonMerge<byte[], byte[], SinkRecord> jsonMerge = new JsonMerge<>();
jsonMerge.setKeyFunction(this::key);
jsonMerge.setValueFunction(this::jsonValue);
jsonMerge.setConditionFunction(this::isNullValue);
jsonMerge.setPathFunction(this::determineJsonPath);
return jsonMerge;
case SET:
Set<byte[], byte[], SinkRecord> set = new Set<>();
set.setKeyFunction(this::key);
Expand All @@ -191,6 +204,7 @@ private WriteOperation<byte[], byte[], SinkRecord> operation() {
Sadd<byte[], byte[], SinkRecord> sadd = new Sadd<>();
sadd.setKeyFunction(this::collectionKey);
sadd.setValueFunction(this::member);
sadd.setConditionFunction(this::isNullValue);
return sadd;
case TSADD:
TsAdd<byte[], byte[], SinkRecord> tsAdd = new TsAdd<>();
Expand All @@ -211,6 +225,10 @@ private WriteOperation<byte[], byte[], SinkRecord> operation() {
}
}

private boolean isNullValue(SinkRecord sinkRecord) {
return sinkRecord.value() == null;
}

private byte[] value(SinkRecord sinkRecord) {
return bytes("value", sinkRecord.value());
}
Expand Down Expand Up @@ -307,6 +325,33 @@ private Map<byte[], byte[]> map(SinkRecord sinkRecord) {
throw new ConnectException("Unsupported source value type: " + sinkRecord.valueSchema().type().name());
}

private String determineJsonPath(SinkRecord record) {
try {
Headers headers = record.headers();
String jsonPath = getJsonPathFromHeader(headers, config.getJsonPath());

if (jsonPath == null) {
return config.getFixedJsonPath();
}

return jsonPath;
} catch (Exception e) {
log.error("Error determining JSON path: ", e);
return config.getFixedJsonPath();
}
}

private String getJsonPathFromHeader(Headers headers, String path) {
for (Header header : headers) {
System.out.println("path "+path);
System.out.println("header "+header);
if (header.key().equals(path)) {
return header.value().toString();
}
}
return null;
}

@Override
public void stop() {
if (writer != null) {
Expand Down
Loading