diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..37e649f --- /dev/null +++ b/.editorconfig @@ -0,0 +1,4 @@ +root = true +[*.java] +indent_style = space +indent_size = 2 diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowField.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowField.java index 89b3c65..7326dda 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowField.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowField.java @@ -27,15 +27,14 @@ import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.data.Timestamp; import java.util.Collection; -import java.util.Date; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import java.util.stream.Collectors; + @Title("TimestampNowField") @Description("This transformation is used to set a field with the current timestamp of the system running the " + "transformation.") @@ -66,25 +65,20 @@ public void close() { Map schemaCache = new HashMap<>(); - static boolean isTimestampSchema(Schema schema) { - return (Timestamp.SCHEMA.type() == schema.type() && Timestamp.SCHEMA.name().equals(schema.name())); - } - @Override protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct input) { - Date timestamp = new Date(this.time.milliseconds()); - + Object timestamp = getFormattedTimestamp(); Schema outputSchema = schemaCache.computeIfAbsent(inputSchema, schema -> { Collection replaceFields = schema.fields().stream() .filter(f -> this.config.fields.contains(f.name())) - .filter(f -> !isTimestampSchema(f.schema())) + .filter(f -> !this.config.targetType.isMatchingSchema(f.schema())) .map(Field::name) .collect(Collectors.toList()); SchemaBuilder builder = SchemaBuilders.of(schema, replaceFields); this.config.fields.forEach(timestampField -> { Field existingField = builder.field(timestampField); if (null == existingField) { - builder.field(timestampField, Timestamp.SCHEMA); + builder.field(timestampField, this.config.targetType.getSchema()); } }); return builder.build(); @@ -98,10 +92,18 @@ protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct inpu return new SchemaAndValue(outputSchema, output); } + private Object getFormattedTimestamp() { + long desiredTimeInMillis = this.time.milliseconds(); + if (config.addAmount > 0) { + desiredTimeInMillis += config.addAmount * config.addChronoUnit.getDuration().toMillis(); + } + return this.config.targetType.getFormattedTimestamp(desiredTimeInMillis); + } + @Override protected SchemaAndValue processMap(R record, Map input) { Map result = new LinkedHashMap<>(input); - Date timestamp = new Date(this.time.milliseconds()); + Object timestamp = getFormattedTimestamp(); this.config.fields.forEach(field -> result.put(field, timestamp)); return new SchemaAndValue(null, result); } diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldConfig.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldConfig.java index f7370c5..6989f88 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldConfig.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldConfig.java @@ -17,21 +17,42 @@ import com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder; import com.github.jcustenborder.kafka.connect.utils.config.ConfigUtils; +import com.github.jcustenborder.kafka.connect.utils.config.validators.ValidChronoUnit; +import com.github.jcustenborder.kafka.connect.utils.config.validators.Validators; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; +import java.time.temporal.ChronoUnit; import java.util.Map; import java.util.Set; class TimestampNowFieldConfig extends AbstractConfig { public static final String FIELDS_CONF = "fields"; + public static final String ADD_AMOUNT_CONF = "add.amount"; + public static final String ADD_CHRONO_UNIT_CONF = "add.chronounit"; + + public static final String TARGET_TYPE_CONF = "target.type"; + public static final String FIELDS_DOC = "The field(s) that will be inserted with the timestamp of the system."; + public static final String ADD_AMOUNT_DOC = "how many of the chosen ChronoUnits to add on top of the timestamp of the system."; + + public static final String ADD_CHRONO_UNIT_DOC = "String representation of the ChronoUnit to add, eg: 'DAYS'"; + + public static final String TARGET_TYPE_DOC = "The desired timestamp representation: Unix, Date"; + public final Set fields; + public final Long addAmount; + public final ChronoUnit addChronoUnit; + + public final TimestampNowFieldTargetType targetType; public TimestampNowFieldConfig(Map originals) { super(config(), originals); this.fields = ConfigUtils.getSet(this, FIELDS_CONF); + this.addAmount = getLong(ADD_AMOUNT_CONF); + this.addChronoUnit = ChronoUnit.valueOf(getString(ADD_CHRONO_UNIT_CONF)); + this.targetType = ConfigUtils.getEnum(TimestampNowFieldTargetType.class, this, TARGET_TYPE_CONF); } public static ConfigDef config() { @@ -41,6 +62,26 @@ public static ConfigDef config() { .documentation(FIELDS_DOC) .importance(ConfigDef.Importance.HIGH) .build() + ).define( + ConfigKeyBuilder.of(ADD_AMOUNT_CONF, ConfigDef.Type.LONG) + .documentation(ADD_AMOUNT_DOC) + .importance(ConfigDef.Importance.LOW) + .defaultValue("0") + .build() + ).define( + ConfigKeyBuilder.of(ADD_CHRONO_UNIT_CONF, ConfigDef.Type.STRING) + .documentation(ADD_CHRONO_UNIT_DOC) + .importance(ConfigDef.Importance.LOW) + .defaultValue("DAYS") + .validator(new ValidChronoUnit()) + .build() + ).define( + ConfigKeyBuilder.of(TARGET_TYPE_CONF, ConfigDef.Type.STRING) + .documentation(TARGET_TYPE_DOC) + .importance(ConfigDef.Importance.LOW) + .defaultValue("DATE") + .validator(Validators.validEnum(TimestampNowFieldTargetType.class)) + .build() ); } } diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTargetType.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTargetType.java new file mode 100644 index 0000000..e8efadb --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTargetType.java @@ -0,0 +1,63 @@ +/** + * Copyright © 2017 Jeremy Custenborder (jcustenborder@gmail.com) + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.transform.common; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Timestamp; + +import java.time.Instant; +import java.util.Date; + +public enum TimestampNowFieldTargetType { + DATE { + @Override + boolean isMatchingSchema(Schema schema) { + return Timestamp.SCHEMA.type() == getSchema().type() && Timestamp.SCHEMA.name().equals(schema.name()); + } + + @Override + Schema getSchema() { + return Timestamp.SCHEMA; + } + + @Override + Object getFormattedTimestamp(long timeInMillis) { + return new Date(timeInMillis); + } + }, + UNIX { + @Override + boolean isMatchingSchema(Schema schema) { + return Schema.Type.INT64 == schema.type() && null == schema.name(); + } + + @Override + Schema getSchema() { + return Schema.INT64_SCHEMA; + } + + @Override + Object getFormattedTimestamp(long timeInMillis) { + return Instant.ofEpochMilli(timeInMillis).getEpochSecond(); + } + }; + + abstract boolean isMatchingSchema(Schema schema); + + abstract Schema getSchema(); + + abstract Object getFormattedTimestamp(long timeInMillis); +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/utils/config/validators/ValidChronoUnit.java b/src/main/java/com/github/jcustenborder/kafka/connect/utils/config/validators/ValidChronoUnit.java new file mode 100644 index 0000000..fe52864 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/utils/config/validators/ValidChronoUnit.java @@ -0,0 +1,62 @@ +/** + * Copyright © 2017 Jeremy Custenborder (jcustenborder@gmail.com) + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.utils.config.validators; + +import com.google.common.base.Joiner; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; + +import java.time.temporal.ChronoUnit; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class ValidChronoUnit implements ConfigDef.Validator { + List validOptions = Arrays.stream(ChronoUnit.values()).map(ChronoUnit::name).collect(Collectors.toList()); + + @Override + public void ensureValid(String s, Object o) { + if (o instanceof String) { + if (!validOptions.contains(o)) { + throw new ConfigException( + s, + String.format( + "'%s' is not a valid value for %s. Valid values are %s.", + o, + ChronoUnit.class.getSimpleName(), + Joiner.on(", ").join(validOptions) + ) + ); + } + } else if (o instanceof List) { + List list = (List) o; + for (Object i : list) { + ensureValid(s, i); + } + } else { + throw new ConfigException( + s, + o, + "Must be a String or List" + ); + } + } + + @Override + public String toString() { + return "Matches: ``" + Joiner.on("``, ``").join(this.validOptions) + "``"; + } +} diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTest.java index 96fa39a..b442b8a 100644 --- a/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTest.java +++ b/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTest.java @@ -9,40 +9,66 @@ import org.apache.kafka.connect.sink.SinkRecord; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.Date; import java.util.Map; import static com.github.jcustenborder.kafka.connect.utils.AssertStruct.assertStruct; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class TimestampNowFieldTest { TimestampNowField transformation; - Date timestamp = new Date(1586963336123L); + static Instant timestampNow = Instant.parse("2023-06-02T10:06:45+00:00"); + static Date timestampNowAsDate = Date.from(timestampNow); @BeforeEach public void beforeEach() { this.transformation = new TimestampNowField.Value<>(); - Date timestamp = new Date(1586963336123L); Time time = mock(Time.class); - when(time.milliseconds()).thenReturn(timestamp.getTime()); + when(time.milliseconds()).thenReturn(timestampNow.toEpochMilli()); this.transformation.time = time; this.transformation.configure( ImmutableMap.of(TimestampNowFieldConfig.FIELDS_CONF, "timestamp") ); } + @BeforeEach public void afterEach() { this.transformation.close(); } - @Test - public void structFieldMissing() { + enum StructUseCases { + DATE(TimestampNowFieldTargetType.DATE, Timestamp.SCHEMA, timestampNowAsDate), + UNIX(TimestampNowFieldTargetType.UNIX, Schema.INT64_SCHEMA, timestampNow.getEpochSecond()); + + public final TimestampNowFieldTargetType targetType; + public final Schema schema; + public final Object value; + + + StructUseCases(TimestampNowFieldTargetType targetType, Schema schema, Object value) { + this.targetType = targetType; + this.schema = schema; + this.value = value; + } + } + + @ParameterizedTest + @EnumSource(StructUseCases.class) + public void structFieldMissing(StructUseCases useCase) { + this.transformation.configure( + ImmutableMap.of( + TimestampNowFieldConfig.FIELDS_CONF, "timestamp", + TimestampNowFieldConfig.TARGET_TYPE_CONF, useCase.targetType.name() + ) + ); final Schema inputSchema = SchemaBuilder.struct() .name("something") .field("firstName", Schema.STRING_SCHEMA) @@ -52,7 +78,7 @@ public void structFieldMissing() { .name("something") .field("firstName", Schema.STRING_SCHEMA) .field("lastName", Schema.STRING_SCHEMA) - .field("timestamp", Timestamp.SCHEMA) + .field("timestamp", useCase.schema) .build(); final Struct inputStruct = new Struct(inputSchema) .put("firstName", "example") @@ -60,7 +86,7 @@ public void structFieldMissing() { final Struct expectedStruct = new Struct(expectedSchema) .put("firstName", "example") .put("lastName", "user") - .put("timestamp", timestamp); + .put("timestamp", useCase.value); final SinkRecord input = new SinkRecord( "test", 1, @@ -76,8 +102,16 @@ public void structFieldMissing() { final Struct actualStruct = (Struct) output.value(); assertStruct(expectedStruct, actualStruct); } - @Test - public void structFieldExists() { + + @ParameterizedTest + @EnumSource(StructUseCases.class) + public void structFieldExists(StructUseCases useCase) { + this.transformation.configure( + ImmutableMap.of( + TimestampNowFieldConfig.FIELDS_CONF, "timestamp", + TimestampNowFieldConfig.TARGET_TYPE_CONF, useCase.targetType.name() + ) + ); final Schema inputSchema = SchemaBuilder.struct() .name("something") .field("firstName", Schema.STRING_SCHEMA) @@ -88,7 +122,7 @@ public void structFieldExists() { .name("something") .field("firstName", Schema.STRING_SCHEMA) .field("lastName", Schema.STRING_SCHEMA) - .field("timestamp", Timestamp.SCHEMA) + .field("timestamp", useCase.schema) .build(); final Struct inputStruct = new Struct(inputSchema) .put("firstName", "example") @@ -96,7 +130,7 @@ public void structFieldExists() { final Struct expectedStruct = new Struct(expectedSchema) .put("firstName", "example") .put("lastName", "user") - .put("timestamp", timestamp); + .put("timestamp", useCase.value); final SinkRecord input = new SinkRecord( "test", 1, @@ -112,8 +146,60 @@ public void structFieldExists() { final Struct actualStruct = (Struct) output.value(); assertStruct(expectedStruct, actualStruct); } + @Test - public void structFieldMismatch() { + public void structAddTwoDays() { + Date timestampNowPlusTwoDays = Date.from(timestampNow.plus(2, ChronoUnit.DAYS)); + this.transformation.configure( + ImmutableMap.of( + TimestampNowFieldConfig.FIELDS_CONF, "timestamp", + TimestampNowFieldConfig.ADD_AMOUNT_CONF, "2", + TimestampNowFieldConfig.ADD_CHRONO_UNIT_CONF, "DAYS" + ) + ); + final Schema inputSchema = SchemaBuilder.struct() + .name("something") + .field("firstName", Schema.STRING_SCHEMA) + .field("lastName", Schema.STRING_SCHEMA) + .build(); + final Schema expectedSchema = SchemaBuilder.struct() + .name("something") + .field("firstName", Schema.STRING_SCHEMA) + .field("lastName", Schema.STRING_SCHEMA) + .field("timestamp", Timestamp.SCHEMA) + .build(); + final Struct inputStruct = new Struct(inputSchema) + .put("firstName", "example") + .put("lastName", "user"); + final Struct expectedStruct = new Struct(expectedSchema) + .put("firstName", "example") + .put("lastName", "user") + .put("timestamp", timestampNowPlusTwoDays); + final SinkRecord input = new SinkRecord( + "test", + 1, + null, + null, + inputSchema, + inputStruct, + 1234L + ); + final SinkRecord output = this.transformation.apply(input); + assertNotNull(output, "output should not be null."); + assertTrue(output.value() instanceof Struct, "value should be a struct"); + final Struct actualStruct = (Struct) output.value(); + assertStruct(expectedStruct, actualStruct); + } + + @ParameterizedTest + @EnumSource(StructUseCases.class) + public void structFieldMismatch(StructUseCases useCase) { + this.transformation.configure( + ImmutableMap.of( + TimestampNowFieldConfig.FIELDS_CONF, "timestamp", + TimestampNowFieldConfig.TARGET_TYPE_CONF, useCase.targetType.name() + ) + ); final Schema inputSchema = SchemaBuilder.struct() .name("something") .field("firstName", Schema.STRING_SCHEMA) @@ -124,7 +210,7 @@ public void structFieldMismatch() { .name("something") .field("firstName", Schema.STRING_SCHEMA) .field("lastName", Schema.STRING_SCHEMA) - .field("timestamp", Timestamp.SCHEMA) + .field("timestamp", useCase.schema) .build(); final Struct inputStruct = new Struct(inputSchema) .put("firstName", "example") @@ -132,7 +218,7 @@ public void structFieldMismatch() { final Struct expectedStruct = new Struct(expectedSchema) .put("firstName", "example") .put("lastName", "user") - .put("timestamp", timestamp); + .put("timestamp", useCase.value); final SinkRecord input = new SinkRecord( "test", 1, @@ -152,7 +238,7 @@ public void structFieldMismatch() { @Test public void mapFieldMissing() { final Map expected = ImmutableMap.of( - "firstName", "example", "lastName", "user", "timestamp", timestamp + "firstName", "example", "lastName", "user", "timestamp", timestampNowAsDate ); final SinkRecord input = new SinkRecord( "test", @@ -170,6 +256,63 @@ public void mapFieldMissing() { assertEquals(expected, actual); } + @Test + public void mapAddThreeHours() { + Date timestampPlusThreeHours = Date.from(timestampNow.plus(3, ChronoUnit.HOURS)); + this.transformation.configure( + ImmutableMap.of( + TimestampNowFieldConfig.FIELDS_CONF, "timestamp", + TimestampNowFieldConfig.ADD_AMOUNT_CONF, "3", + TimestampNowFieldConfig.ADD_CHRONO_UNIT_CONF, "HOURS" + ) + ); + final Map expected = ImmutableMap.of( + "firstName", "example", "lastName", "user", "timestamp", timestampPlusThreeHours + ); + final SinkRecord input = new SinkRecord( + "test", + 1, + null, + null, + null, + ImmutableMap.of("firstName", "example", "lastName", "user"), + 1234L + ); + final SinkRecord output = this.transformation.apply(input); + assertNotNull(output, "output should not be null."); + assertTrue(output.value() instanceof Map, "value should be a struct"); + final Map actual = (Map) output.value(); + assertEquals(expected, actual); + } + + @Test + public void mapFormattedAsUnix() { + long timestampFormattedAsUnix = timestampNow.getEpochSecond(); + this.transformation.configure( + ImmutableMap.of( + TimestampNowFieldConfig.FIELDS_CONF, "timestamp", + TimestampNowFieldConfig.TARGET_TYPE_CONF, "UNIX" + ) + ); + final Map expected = ImmutableMap.of( + "firstName", "example", "lastName", "user", "timestamp", timestampFormattedAsUnix + ); + final SinkRecord input = new SinkRecord( + "test", + 1, + null, + null, + null, + ImmutableMap.of("firstName", "example", "lastName", "user"), + 1234L + ); + final SinkRecord output = this.transformation.apply(input); + assertNotNull(output, "output should not be null."); + assertTrue(output.value() instanceof Map, "value should be a struct"); + final Map actual = (Map) output.value(); + assertEquals(expected, actual); + } + @Test public void config() { assertNotNull(this.transformation.config());