From 2ee2efdd727c8c58caf9e34c1634c9289a720fd1 Mon Sep 17 00:00:00 2001 From: "jeffrey.vanderlaan" Date: Tue, 30 May 2023 14:12:03 +0200 Subject: [PATCH 1/6] Add options to add configurable amount of ChronoUnits to the timestamp field and output as Unix time in seconds. --- .../transform/common/TimestampNowField.java | 42 ++++- .../common/TimestampNowFieldConfig.java | 46 ++++++ .../config/validators/ValidChronoUnit.java | 47 ++++++ .../common/TimestampNowFieldTest.java | 156 +++++++++++++++++- 4 files changed, 283 insertions(+), 8 deletions(-) create mode 100644 src/main/java/com/github/jcustenborder/kafka/connect/utils/config/validators/ValidChronoUnit.java diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowField.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowField.java index 89b3c65..c5bdb22 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowField.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowField.java @@ -29,6 +29,8 @@ import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Timestamp; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.Collection; import java.util.Date; import java.util.HashMap; @@ -72,19 +74,34 @@ static boolean isTimestampSchema(Schema schema) { @Override protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct input) { - Date timestamp = new Date(this.time.milliseconds()); - + Object timestamp = getFormattedTimestamp(); Schema outputSchema = schemaCache.computeIfAbsent(inputSchema, schema -> { Collection replaceFields = schema.fields().stream() .filter(f -> this.config.fields.contains(f.name())) - .filter(f -> !isTimestampSchema(f.schema())) + .filter(f -> { + switch (this.config.targetType) { + default: + case Date: + return !isTimestampSchema(f.schema()); + case Unix: + return f.schema().type() != Schema.Type.INT64; + } + }) .map(Field::name) .collect(Collectors.toList()); SchemaBuilder builder = SchemaBuilders.of(schema, replaceFields); this.config.fields.forEach(timestampField -> { Field existingField = builder.field(timestampField); if (null == existingField) { - builder.field(timestampField, Timestamp.SCHEMA); + switch (config.targetType) { + default: + case Date: + builder.field(timestampField, Timestamp.SCHEMA); + break; + case Unix: + builder.field(timestampField, Schema.INT64_SCHEMA); + break; + } } }); return builder.build(); @@ -98,10 +115,25 @@ protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct inpu return new SchemaAndValue(outputSchema, output); } + private Object getFormattedTimestamp() { + long desiredTime = this.time.milliseconds(); + if (config.addAmount > 0) { + desiredTime += config.addChronoUnit.getDuration().toMillis(); + } + Instant desiredInstant = Instant.ofEpochMilli(desiredTime); + switch (config.targetType) { + default: + case Date: + return Date.from(desiredInstant); + case Unix: + return desiredInstant.getEpochSecond(); + } + } + @Override protected SchemaAndValue processMap(R record, Map input) { Map result = new LinkedHashMap<>(input); - Date timestamp = new Date(this.time.milliseconds()); + Object timestamp = getFormattedTimestamp(); this.config.fields.forEach(field -> result.put(field, timestamp)); return new SchemaAndValue(null, result); } diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldConfig.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldConfig.java index f7370c5..9d03c6c 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldConfig.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldConfig.java @@ -17,21 +17,42 @@ import com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder; import com.github.jcustenborder.kafka.connect.utils.config.ConfigUtils; +import com.github.jcustenborder.kafka.connect.utils.config.validators.ValidChronoUnit; +import com.github.jcustenborder.kafka.connect.utils.config.validators.Validators; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; +import java.time.temporal.ChronoUnit; import java.util.Map; import java.util.Set; class TimestampNowFieldConfig extends AbstractConfig { public static final String FIELDS_CONF = "fields"; + public static final String ADD_AMOUNT_CONF = "add.amount"; + public static final String ADD_CHRONO_UNIT_CONF = "add.chronounit"; + + public static final String TARGET_TYPE_CONF = "target.type"; + public static final String FIELDS_DOC = "The field(s) that will be inserted with the timestamp of the system."; + public static final String ADD_AMOUNT_DOC = "how many of the chosen ChronoUnits to add on top of the timestamp of the system."; + + public static final String ADD_CHRONO_UNIT_DOC = "String representation of the ChronoUnit to add, eg: 'DAYS'"; + + public static final String TARGET_TYPE_DOC = "The desired timestamp representation: Unix, Date"; + public final Set fields; + public final Long addAmount; + public final ChronoUnit addChronoUnit; + + public final TargetType targetType; public TimestampNowFieldConfig(Map originals) { super(config(), originals); this.fields = ConfigUtils.getSet(this, FIELDS_CONF); + this.addAmount = getLong(ADD_AMOUNT_CONF); + this.addChronoUnit = ChronoUnit.valueOf(getString(ADD_CHRONO_UNIT_CONF)); + this.targetType = ConfigUtils.getEnum(TargetType.class, this, TARGET_TYPE_CONF); } public static ConfigDef config() { @@ -41,6 +62,31 @@ public static ConfigDef config() { .documentation(FIELDS_DOC) .importance(ConfigDef.Importance.HIGH) .build() + ).define( + ConfigKeyBuilder.of(ADD_AMOUNT_CONF, ConfigDef.Type.LONG) + .documentation(ADD_AMOUNT_DOC) + .importance(ConfigDef.Importance.LOW) + .defaultValue("0") + .build() + ).define( + ConfigKeyBuilder.of(ADD_CHRONO_UNIT_CONF, ConfigDef.Type.STRING) + .documentation(ADD_CHRONO_UNIT_DOC) + .importance(ConfigDef.Importance.LOW) + .defaultValue("DAYS") + .validator(new ValidChronoUnit()) + .build() + ).define( + ConfigKeyBuilder.of(TARGET_TYPE_CONF, ConfigDef.Type.STRING) + .documentation(TARGET_TYPE_DOC) + .importance(ConfigDef.Importance.LOW) + .defaultValue("Date") + .validator(Validators.validEnum(TargetType.class)) + .build() ); } + + public enum TargetType { + Date(), + Unix(), + } } diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/utils/config/validators/ValidChronoUnit.java b/src/main/java/com/github/jcustenborder/kafka/connect/utils/config/validators/ValidChronoUnit.java new file mode 100644 index 0000000..96726f6 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/utils/config/validators/ValidChronoUnit.java @@ -0,0 +1,47 @@ +package com.github.jcustenborder.kafka.connect.utils.config.validators; + +import com.google.common.base.Joiner; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; + +import java.time.temporal.ChronoUnit; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class ValidChronoUnit implements ConfigDef.Validator { + List validOptions = Arrays.stream(ChronoUnit.values()).map(ChronoUnit::name).collect(Collectors.toList()); + + @Override + public void ensureValid(String s, Object o) { + if (o instanceof String) { + if (!validOptions.contains(o)) { + throw new ConfigException( + s, + String.format( + "'%s' is not a valid value for %s. Valid values are %s.", + o, + ChronoUnit.class.getSimpleName(), + Joiner.on(", ").join(validOptions) + ) + ); + } + } else if (o instanceof List) { + List list = (List) o; + for (Object i : list) { + ensureValid(s, i); + } + } else { + throw new ConfigException( + s, + o, + "Must be a String or List" + ); + } + } + + @Override + public String toString() { + return "Matches: ``" + Joiner.on("``, ``").join(this.validOptions) + "``"; + } +} diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTest.java index 96fa39a..a9b1ea0 100644 --- a/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTest.java +++ b/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTest.java @@ -10,13 +10,12 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.time.temporal.ChronoUnit; import java.util.Date; import java.util.Map; import static com.github.jcustenborder.kafka.connect.utils.AssertStruct.assertStruct; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -110,8 +109,100 @@ public void structFieldExists() { assertNotNull(output, "output should not be null."); assertTrue(output.value() instanceof Struct, "value should be a struct"); final Struct actualStruct = (Struct) output.value(); + System.out.println(actualStruct); assertStruct(expectedStruct, actualStruct); } + @Test + public void structFieldMissingAddOneDay() { + Date timestampPlusOneDay = Date.from(timestamp.toInstant().plus(1, ChronoUnit.DAYS)); + this.transformation.configure( + ImmutableMap.of( + TimestampNowFieldConfig.FIELDS_CONF, "timestamp", + TimestampNowFieldConfig.ADD_AMOUNT_CONF, "1", + TimestampNowFieldConfig.ADD_CHRONO_UNIT_CONF, "DAYS" + ) + ); + final Schema inputSchema = SchemaBuilder.struct() + .name("something") + .field("firstName", Schema.STRING_SCHEMA) + .field("lastName", Schema.STRING_SCHEMA) + .build(); + final Schema expectedSchema = SchemaBuilder.struct() + .name("something") + .field("firstName", Schema.STRING_SCHEMA) + .field("lastName", Schema.STRING_SCHEMA) + .field("timestamp", Timestamp.SCHEMA) + .build(); + final Struct inputStruct = new Struct(inputSchema) + .put("firstName", "example") + .put("lastName", "user"); + final Struct expectedStruct = new Struct(expectedSchema) + .put("firstName", "example") + .put("lastName", "user") + .put("timestamp", timestampPlusOneDay); + final SinkRecord input = new SinkRecord( + "test", + 1, + null, + null, + inputSchema, + inputStruct, + 1234L + ); + final SinkRecord output = this.transformation.apply(input); + assertNotNull(output, "output should not be null."); + assertTrue(output.value() instanceof Struct, "value should be a struct"); + final Struct actualStruct = (Struct) output.value(); + System.out.println(actualStruct); + assertStruct(expectedStruct, actualStruct); + } + + @Test + public void structFieldMissingAddOneDayFormattedAsUnix() { + long timestampPlusOneDayFormattedAsUnix = timestamp.toInstant().plus(1, ChronoUnit.DAYS).getEpochSecond(); + this.transformation.configure( + ImmutableMap.of( + TimestampNowFieldConfig.FIELDS_CONF, "timestamp", + TimestampNowFieldConfig.ADD_AMOUNT_CONF, "1", + TimestampNowFieldConfig.ADD_CHRONO_UNIT_CONF, "DAYS", + TimestampNowFieldConfig.TARGET_TYPE_CONF, "Unix" + ) + ); + final Schema inputSchema = SchemaBuilder.struct() + .name("something") + .field("firstName", Schema.STRING_SCHEMA) + .field("lastName", Schema.STRING_SCHEMA) + .build(); + final Schema expectedSchema = SchemaBuilder.struct() + .name("something") + .field("firstName", Schema.STRING_SCHEMA) + .field("lastName", Schema.STRING_SCHEMA) + .field("timestamp", Schema.INT64_SCHEMA) + .build(); + final Struct inputStruct = new Struct(inputSchema) + .put("firstName", "example") + .put("lastName", "user"); + final Struct expectedStruct = new Struct(expectedSchema) + .put("firstName", "example") + .put("lastName", "user") + .put("timestamp", timestampPlusOneDayFormattedAsUnix); + final SinkRecord input = new SinkRecord( + "test", + 1, + null, + null, + inputSchema, + inputStruct, + 1234L + ); + final SinkRecord output = this.transformation.apply(input); + assertNotNull(output, "output should not be null."); + assertTrue(output.value() instanceof Struct, "value should be a struct"); + final Struct actualStruct = (Struct) output.value(); + System.out.println(actualStruct); + assertStruct(expectedStruct, actualStruct); + } + @Test public void structFieldMismatch() { final Schema inputSchema = SchemaBuilder.struct() @@ -170,6 +261,65 @@ public void mapFieldMissing() { assertEquals(expected, actual); } + @Test + public void mapFieldMissingAddOneDay() { + Date timestampPlusOneDay = Date.from(timestamp.toInstant().plus(1, ChronoUnit.DAYS)); + this.transformation.configure( + ImmutableMap.of( + TimestampNowFieldConfig.FIELDS_CONF, "timestamp", + TimestampNowFieldConfig.ADD_AMOUNT_CONF, "1", + TimestampNowFieldConfig.ADD_CHRONO_UNIT_CONF, "DAYS" + ) + ); + final Map expected = ImmutableMap.of( + "firstName", "example", "lastName", "user", "timestamp", timestampPlusOneDay + ); + final SinkRecord input = new SinkRecord( + "test", + 1, + null, + null, + null, + ImmutableMap.of("firstName", "example", "lastName", "user"), + 1234L + ); + final SinkRecord output = this.transformation.apply(input); + assertNotNull(output, "output should not be null."); + assertTrue(output.value() instanceof Map, "value should be a struct"); + final Map actual = (Map) output.value(); + assertEquals(expected, actual); + } + + @Test + public void mapFieldMissingAddOneDayFormattedAsUnix() { + long timestampPlusOneDayFormattedAsUnix = timestamp.toInstant().plus(1, ChronoUnit.DAYS).getEpochSecond(); + this.transformation.configure( + ImmutableMap.of( + TimestampNowFieldConfig.FIELDS_CONF, "timestamp", + TimestampNowFieldConfig.ADD_AMOUNT_CONF, "1", + TimestampNowFieldConfig.ADD_CHRONO_UNIT_CONF, "DAYS", + TimestampNowFieldConfig.TARGET_TYPE_CONF, "Unix" + ) + ); + final Map expected = ImmutableMap.of( + "firstName", "example", "lastName", "user", "timestamp", timestampPlusOneDayFormattedAsUnix + ); + final SinkRecord input = new SinkRecord( + "test", + 1, + null, + null, + null, + ImmutableMap.of("firstName", "example", "lastName", "user"), + 1234L + ); + final SinkRecord output = this.transformation.apply(input); + assertNotNull(output, "output should not be null."); + assertTrue(output.value() instanceof Map, "value should be a struct"); + final Map actual = (Map) output.value(); + assertEquals(expected, actual); + } + @Test public void config() { assertNotNull(this.transformation.config()); From d364286f790ecb680c092fe6ec5e04cc35002771 Mon Sep 17 00:00:00 2001 From: "jeffrey.vanderlaan" Date: Tue, 30 May 2023 15:18:44 +0200 Subject: [PATCH 2/6] Cleanup tests, fix issue with unix output and existing fields --- .../transform/common/TimestampNowField.java | 15 +-- .../common/TimestampNowFieldTest.java | 119 ++++++++---------- 2 files changed, 58 insertions(+), 76 deletions(-) diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowField.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowField.java index c5bdb22..8c94d4d 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowField.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowField.java @@ -22,20 +22,11 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.connector.ConnectRecord; -import org.apache.kafka.connect.data.Field; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.SchemaAndValue; -import org.apache.kafka.connect.data.SchemaBuilder; -import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.data.Timestamp; +import org.apache.kafka.connect.data.*; import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.util.Collection; import java.util.Date; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.Map; +import java.util.*; import java.util.stream.Collectors; @Title("TimestampNowField") @@ -84,7 +75,7 @@ protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct inpu case Date: return !isTimestampSchema(f.schema()); case Unix: - return f.schema().type() != Schema.Type.INT64; + return f.schema().type() != Schema.Type.INT64 || f.schema().name() != null; } }) .map(Field::name) diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTest.java index a9b1ea0..bd7667b 100644 --- a/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTest.java +++ b/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTest.java @@ -9,6 +9,8 @@ import org.apache.kafka.connect.sink.SinkRecord; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.time.temporal.ChronoUnit; import java.util.Date; @@ -22,12 +24,11 @@ public class TimestampNowFieldTest { TimestampNowField transformation; - Date timestamp = new Date(1586963336123L); + static Date timestamp = new Date(1586963336123L); @BeforeEach public void beforeEach() { this.transformation = new TimestampNowField.Value<>(); - Date timestamp = new Date(1586963336123L); Time time = mock(Time.class); when(time.milliseconds()).thenReturn(timestamp.getTime()); this.transformation.time = time; @@ -35,13 +36,37 @@ public void beforeEach() { ImmutableMap.of(TimestampNowFieldConfig.FIELDS_CONF, "timestamp") ); } + @BeforeEach public void afterEach() { this.transformation.close(); } - @Test - public void structFieldMissing() { + enum StructUseCases { + Date(TimestampNowFieldConfig.TargetType.Date, Timestamp.SCHEMA, timestamp), + Unix(TimestampNowFieldConfig.TargetType.Unix, Schema.INT64_SCHEMA, timestamp.toInstant().getEpochSecond()); + + public final TimestampNowFieldConfig.TargetType targetType; + public final Schema schema; + public final Object value; + + + StructUseCases(TimestampNowFieldConfig.TargetType targetType, Schema schema, Object value) { + this.targetType = targetType; + this.schema = schema; + this.value = value; + } + } + + @ParameterizedTest + @EnumSource(StructUseCases.class) + public void structFieldMissing(StructUseCases useCase) { + this.transformation.configure( + ImmutableMap.of( + TimestampNowFieldConfig.FIELDS_CONF, "timestamp", + TimestampNowFieldConfig.TARGET_TYPE_CONF, useCase.targetType.name() + ) + ); final Schema inputSchema = SchemaBuilder.struct() .name("something") .field("firstName", Schema.STRING_SCHEMA) @@ -51,7 +76,7 @@ public void structFieldMissing() { .name("something") .field("firstName", Schema.STRING_SCHEMA) .field("lastName", Schema.STRING_SCHEMA) - .field("timestamp", Timestamp.SCHEMA) + .field("timestamp", useCase.schema) .build(); final Struct inputStruct = new Struct(inputSchema) .put("firstName", "example") @@ -59,7 +84,7 @@ public void structFieldMissing() { final Struct expectedStruct = new Struct(expectedSchema) .put("firstName", "example") .put("lastName", "user") - .put("timestamp", timestamp); + .put("timestamp", useCase.value); final SinkRecord input = new SinkRecord( "test", 1, @@ -75,8 +100,16 @@ public void structFieldMissing() { final Struct actualStruct = (Struct) output.value(); assertStruct(expectedStruct, actualStruct); } - @Test - public void structFieldExists() { + + @ParameterizedTest + @EnumSource(StructUseCases.class) + public void structFieldExists(StructUseCases useCase) { + this.transformation.configure( + ImmutableMap.of( + TimestampNowFieldConfig.FIELDS_CONF, "timestamp", + TimestampNowFieldConfig.TARGET_TYPE_CONF, useCase.targetType.name() + ) + ); final Schema inputSchema = SchemaBuilder.struct() .name("something") .field("firstName", Schema.STRING_SCHEMA) @@ -87,7 +120,7 @@ public void structFieldExists() { .name("something") .field("firstName", Schema.STRING_SCHEMA) .field("lastName", Schema.STRING_SCHEMA) - .field("timestamp", Timestamp.SCHEMA) + .field("timestamp", useCase.schema) .build(); final Struct inputStruct = new Struct(inputSchema) .put("firstName", "example") @@ -95,7 +128,7 @@ public void structFieldExists() { final Struct expectedStruct = new Struct(expectedSchema) .put("firstName", "example") .put("lastName", "user") - .put("timestamp", timestamp); + .put("timestamp", useCase.value); final SinkRecord input = new SinkRecord( "test", 1, @@ -109,11 +142,11 @@ public void structFieldExists() { assertNotNull(output, "output should not be null."); assertTrue(output.value() instanceof Struct, "value should be a struct"); final Struct actualStruct = (Struct) output.value(); - System.out.println(actualStruct); assertStruct(expectedStruct, actualStruct); } + @Test - public void structFieldMissingAddOneDay() { + public void structAddOneDay() { Date timestampPlusOneDay = Date.from(timestamp.toInstant().plus(1, ChronoUnit.DAYS)); this.transformation.configure( ImmutableMap.of( @@ -153,58 +186,18 @@ public void structFieldMissingAddOneDay() { assertNotNull(output, "output should not be null."); assertTrue(output.value() instanceof Struct, "value should be a struct"); final Struct actualStruct = (Struct) output.value(); - System.out.println(actualStruct); assertStruct(expectedStruct, actualStruct); } - @Test - public void structFieldMissingAddOneDayFormattedAsUnix() { - long timestampPlusOneDayFormattedAsUnix = timestamp.toInstant().plus(1, ChronoUnit.DAYS).getEpochSecond(); + @ParameterizedTest + @EnumSource(StructUseCases.class) + public void structFieldMismatch(StructUseCases useCase) { this.transformation.configure( ImmutableMap.of( TimestampNowFieldConfig.FIELDS_CONF, "timestamp", - TimestampNowFieldConfig.ADD_AMOUNT_CONF, "1", - TimestampNowFieldConfig.ADD_CHRONO_UNIT_CONF, "DAYS", - TimestampNowFieldConfig.TARGET_TYPE_CONF, "Unix" + TimestampNowFieldConfig.TARGET_TYPE_CONF, useCase.targetType.name() ) ); - final Schema inputSchema = SchemaBuilder.struct() - .name("something") - .field("firstName", Schema.STRING_SCHEMA) - .field("lastName", Schema.STRING_SCHEMA) - .build(); - final Schema expectedSchema = SchemaBuilder.struct() - .name("something") - .field("firstName", Schema.STRING_SCHEMA) - .field("lastName", Schema.STRING_SCHEMA) - .field("timestamp", Schema.INT64_SCHEMA) - .build(); - final Struct inputStruct = new Struct(inputSchema) - .put("firstName", "example") - .put("lastName", "user"); - final Struct expectedStruct = new Struct(expectedSchema) - .put("firstName", "example") - .put("lastName", "user") - .put("timestamp", timestampPlusOneDayFormattedAsUnix); - final SinkRecord input = new SinkRecord( - "test", - 1, - null, - null, - inputSchema, - inputStruct, - 1234L - ); - final SinkRecord output = this.transformation.apply(input); - assertNotNull(output, "output should not be null."); - assertTrue(output.value() instanceof Struct, "value should be a struct"); - final Struct actualStruct = (Struct) output.value(); - System.out.println(actualStruct); - assertStruct(expectedStruct, actualStruct); - } - - @Test - public void structFieldMismatch() { final Schema inputSchema = SchemaBuilder.struct() .name("something") .field("firstName", Schema.STRING_SCHEMA) @@ -215,7 +208,7 @@ public void structFieldMismatch() { .name("something") .field("firstName", Schema.STRING_SCHEMA) .field("lastName", Schema.STRING_SCHEMA) - .field("timestamp", Timestamp.SCHEMA) + .field("timestamp", useCase.schema) .build(); final Struct inputStruct = new Struct(inputSchema) .put("firstName", "example") @@ -223,7 +216,7 @@ public void structFieldMismatch() { final Struct expectedStruct = new Struct(expectedSchema) .put("firstName", "example") .put("lastName", "user") - .put("timestamp", timestamp); + .put("timestamp", useCase.value); final SinkRecord input = new SinkRecord( "test", 1, @@ -262,7 +255,7 @@ public void mapFieldMissing() { } @Test - public void mapFieldMissingAddOneDay() { + public void mapAddOneDay() { Date timestampPlusOneDay = Date.from(timestamp.toInstant().plus(1, ChronoUnit.DAYS)); this.transformation.configure( ImmutableMap.of( @@ -291,18 +284,16 @@ public void mapFieldMissingAddOneDay() { } @Test - public void mapFieldMissingAddOneDayFormattedAsUnix() { - long timestampPlusOneDayFormattedAsUnix = timestamp.toInstant().plus(1, ChronoUnit.DAYS).getEpochSecond(); + public void mapFormattedAsUnix() { + long timestampFormattedAsUnix = timestamp.toInstant().getEpochSecond(); this.transformation.configure( ImmutableMap.of( TimestampNowFieldConfig.FIELDS_CONF, "timestamp", - TimestampNowFieldConfig.ADD_AMOUNT_CONF, "1", - TimestampNowFieldConfig.ADD_CHRONO_UNIT_CONF, "DAYS", TimestampNowFieldConfig.TARGET_TYPE_CONF, "Unix" ) ); final Map expected = ImmutableMap.of( - "firstName", "example", "lastName", "user", "timestamp", timestampPlusOneDayFormattedAsUnix + "firstName", "example", "lastName", "user", "timestamp", timestampFormattedAsUnix ); final SinkRecord input = new SinkRecord( "test", From 7cf254262ca991cd738ba83c3e1af2a2b30c2c9c Mon Sep 17 00:00:00 2001 From: "jeffrey.vanderlaan" Date: Tue, 30 May 2023 15:38:25 +0200 Subject: [PATCH 3/6] Apply factory pattern to clean up the code --- .../transform/common/TimestampNowField.java | 44 ++++------------- .../common/TimestampNowFieldConfig.java | 11 ++--- .../common/TimestampNowFieldTargetType.java | 48 +++++++++++++++++++ .../common/TimestampNowFieldTest.java | 8 ++-- 4 files changed, 64 insertions(+), 47 deletions(-) create mode 100644 src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTargetType.java diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowField.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowField.java index 8c94d4d..f2de19c 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowField.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowField.java @@ -24,9 +24,10 @@ import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.*; -import java.time.Instant; -import java.util.Date; -import java.util.*; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; import java.util.stream.Collectors; @Title("TimestampNowField") @@ -59,40 +60,20 @@ public void close() { Map schemaCache = new HashMap<>(); - static boolean isTimestampSchema(Schema schema) { - return (Timestamp.SCHEMA.type() == schema.type() && Timestamp.SCHEMA.name().equals(schema.name())); - } - @Override protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct input) { Object timestamp = getFormattedTimestamp(); Schema outputSchema = schemaCache.computeIfAbsent(inputSchema, schema -> { Collection replaceFields = schema.fields().stream() .filter(f -> this.config.fields.contains(f.name())) - .filter(f -> { - switch (this.config.targetType) { - default: - case Date: - return !isTimestampSchema(f.schema()); - case Unix: - return f.schema().type() != Schema.Type.INT64 || f.schema().name() != null; - } - }) + .filter(f -> !this.config.targetType.isMatchingSchema(f.schema())) .map(Field::name) .collect(Collectors.toList()); SchemaBuilder builder = SchemaBuilders.of(schema, replaceFields); this.config.fields.forEach(timestampField -> { Field existingField = builder.field(timestampField); if (null == existingField) { - switch (config.targetType) { - default: - case Date: - builder.field(timestampField, Timestamp.SCHEMA); - break; - case Unix: - builder.field(timestampField, Schema.INT64_SCHEMA); - break; - } + builder.field(timestampField, this.config.targetType.getSchema()); } }); return builder.build(); @@ -107,18 +88,11 @@ protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct inpu } private Object getFormattedTimestamp() { - long desiredTime = this.time.milliseconds(); + long desiredTimeInMillis = this.time.milliseconds(); if (config.addAmount > 0) { - desiredTime += config.addChronoUnit.getDuration().toMillis(); - } - Instant desiredInstant = Instant.ofEpochMilli(desiredTime); - switch (config.targetType) { - default: - case Date: - return Date.from(desiredInstant); - case Unix: - return desiredInstant.getEpochSecond(); + desiredTimeInMillis += config.addChronoUnit.getDuration().toMillis(); } + return this.config.targetType.getFormattedTimestamp(desiredTimeInMillis); } @Override diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldConfig.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldConfig.java index 9d03c6c..b5513b5 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldConfig.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldConfig.java @@ -45,14 +45,14 @@ class TimestampNowFieldConfig extends AbstractConfig { public final Long addAmount; public final ChronoUnit addChronoUnit; - public final TargetType targetType; + public final TimestampNowFieldTargetType targetType; public TimestampNowFieldConfig(Map originals) { super(config(), originals); this.fields = ConfigUtils.getSet(this, FIELDS_CONF); this.addAmount = getLong(ADD_AMOUNT_CONF); this.addChronoUnit = ChronoUnit.valueOf(getString(ADD_CHRONO_UNIT_CONF)); - this.targetType = ConfigUtils.getEnum(TargetType.class, this, TARGET_TYPE_CONF); + this.targetType = ConfigUtils.getEnum(TimestampNowFieldTargetType.class, this, TARGET_TYPE_CONF); } public static ConfigDef config() { @@ -80,13 +80,8 @@ public static ConfigDef config() { .documentation(TARGET_TYPE_DOC) .importance(ConfigDef.Importance.LOW) .defaultValue("Date") - .validator(Validators.validEnum(TargetType.class)) + .validator(Validators.validEnum(TimestampNowFieldTargetType.class)) .build() ); } - - public enum TargetType { - Date(), - Unix(), - } } diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTargetType.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTargetType.java new file mode 100644 index 0000000..3cd77cc --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTargetType.java @@ -0,0 +1,48 @@ +package com.github.jcustenborder.kafka.connect.transform.common; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Timestamp; + +import java.time.Instant; +import java.util.Date; + +public enum TimestampNowFieldTargetType { + DATE { + @Override + boolean isMatchingSchema(Schema schema) { + return Timestamp.SCHEMA.type() == getSchema().type() && Timestamp.SCHEMA.name().equals(schema.name()); + } + + @Override + Schema getSchema() { + return Timestamp.SCHEMA; + } + + @Override + Object getFormattedTimestamp(long timeInMillis) { + return new Date(timeInMillis); + } + }, + UNIX { + @Override + boolean isMatchingSchema(Schema schema) { + return Schema.Type.INT64 == schema.type() && null == schema.name(); + } + + @Override + Schema getSchema() { + return Schema.INT64_SCHEMA; + } + + @Override + Object getFormattedTimestamp(long timeInMillis) { + return Instant.ofEpochMilli(timeInMillis).getEpochSecond(); + } + }; + + abstract boolean isMatchingSchema(Schema schema); + + abstract Schema getSchema(); + + abstract Object getFormattedTimestamp(long timeInMillis); +} diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTest.java index bd7667b..4a6c84d 100644 --- a/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTest.java +++ b/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTest.java @@ -43,15 +43,15 @@ public void afterEach() { } enum StructUseCases { - Date(TimestampNowFieldConfig.TargetType.Date, Timestamp.SCHEMA, timestamp), - Unix(TimestampNowFieldConfig.TargetType.Unix, Schema.INT64_SCHEMA, timestamp.toInstant().getEpochSecond()); + Date(TimestampNowFieldTargetType.DATE, Timestamp.SCHEMA, timestamp), + Unix(TimestampNowFieldTargetType.UNIX, Schema.INT64_SCHEMA, timestamp.toInstant().getEpochSecond()); - public final TimestampNowFieldConfig.TargetType targetType; + public final TimestampNowFieldTargetType targetType; public final Schema schema; public final Object value; - StructUseCases(TimestampNowFieldConfig.TargetType targetType, Schema schema, Object value) { + StructUseCases(TimestampNowFieldTargetType targetType, Schema schema, Object value) { this.targetType = targetType; this.schema = schema; this.value = value; From 8b1a03fa80ca673bc688ebaccd1fd72a41856d85 Mon Sep 17 00:00:00 2001 From: "jeffrey.vanderlaan" Date: Tue, 30 May 2023 18:55:32 +0200 Subject: [PATCH 4/6] Fix casing and change indentation --- .editorconfig | 4 + .../transform/common/TimestampNowField.java | 7 +- .../common/TimestampNowFieldConfig.java | 2 +- .../common/TimestampNowFieldTargetType.java | 91 +++++++++++-------- .../config/validators/ValidChronoUnit.java | 77 +++++++++------- .../common/TimestampNowFieldTest.java | 2 +- 6 files changed, 111 insertions(+), 72 deletions(-) create mode 100644 .editorconfig diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..37e649f --- /dev/null +++ b/.editorconfig @@ -0,0 +1,4 @@ +root = true +[*.java] +indent_style = space +indent_size = 2 diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowField.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowField.java index f2de19c..170935c 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowField.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowField.java @@ -22,7 +22,11 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.connector.ConnectRecord; -import org.apache.kafka.connect.data.*; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; import java.util.Collection; import java.util.HashMap; @@ -30,6 +34,7 @@ import java.util.Map; import java.util.stream.Collectors; + @Title("TimestampNowField") @Description("This transformation is used to set a field with the current timestamp of the system running the " + "transformation.") diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldConfig.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldConfig.java index b5513b5..6989f88 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldConfig.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldConfig.java @@ -79,7 +79,7 @@ public static ConfigDef config() { ConfigKeyBuilder.of(TARGET_TYPE_CONF, ConfigDef.Type.STRING) .documentation(TARGET_TYPE_DOC) .importance(ConfigDef.Importance.LOW) - .defaultValue("Date") + .defaultValue("DATE") .validator(Validators.validEnum(TimestampNowFieldTargetType.class)) .build() ); diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTargetType.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTargetType.java index 3cd77cc..e8efadb 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTargetType.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTargetType.java @@ -1,3 +1,18 @@ +/** + * Copyright © 2017 Jeremy Custenborder (jcustenborder@gmail.com) + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.github.jcustenborder.kafka.connect.transform.common; import org.apache.kafka.connect.data.Schema; @@ -7,42 +22,42 @@ import java.util.Date; public enum TimestampNowFieldTargetType { - DATE { - @Override - boolean isMatchingSchema(Schema schema) { - return Timestamp.SCHEMA.type() == getSchema().type() && Timestamp.SCHEMA.name().equals(schema.name()); - } - - @Override - Schema getSchema() { - return Timestamp.SCHEMA; - } - - @Override - Object getFormattedTimestamp(long timeInMillis) { - return new Date(timeInMillis); - } - }, - UNIX { - @Override - boolean isMatchingSchema(Schema schema) { - return Schema.Type.INT64 == schema.type() && null == schema.name(); - } - - @Override - Schema getSchema() { - return Schema.INT64_SCHEMA; - } - - @Override - Object getFormattedTimestamp(long timeInMillis) { - return Instant.ofEpochMilli(timeInMillis).getEpochSecond(); - } - }; - - abstract boolean isMatchingSchema(Schema schema); - - abstract Schema getSchema(); - - abstract Object getFormattedTimestamp(long timeInMillis); + DATE { + @Override + boolean isMatchingSchema(Schema schema) { + return Timestamp.SCHEMA.type() == getSchema().type() && Timestamp.SCHEMA.name().equals(schema.name()); + } + + @Override + Schema getSchema() { + return Timestamp.SCHEMA; + } + + @Override + Object getFormattedTimestamp(long timeInMillis) { + return new Date(timeInMillis); + } + }, + UNIX { + @Override + boolean isMatchingSchema(Schema schema) { + return Schema.Type.INT64 == schema.type() && null == schema.name(); + } + + @Override + Schema getSchema() { + return Schema.INT64_SCHEMA; + } + + @Override + Object getFormattedTimestamp(long timeInMillis) { + return Instant.ofEpochMilli(timeInMillis).getEpochSecond(); + } + }; + + abstract boolean isMatchingSchema(Schema schema); + + abstract Schema getSchema(); + + abstract Object getFormattedTimestamp(long timeInMillis); } diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/utils/config/validators/ValidChronoUnit.java b/src/main/java/com/github/jcustenborder/kafka/connect/utils/config/validators/ValidChronoUnit.java index 96726f6..fe52864 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/utils/config/validators/ValidChronoUnit.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/utils/config/validators/ValidChronoUnit.java @@ -1,3 +1,18 @@ +/** + * Copyright © 2017 Jeremy Custenborder (jcustenborder@gmail.com) + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.github.jcustenborder.kafka.connect.utils.config.validators; import com.google.common.base.Joiner; @@ -10,38 +25,38 @@ import java.util.stream.Collectors; public class ValidChronoUnit implements ConfigDef.Validator { - List validOptions = Arrays.stream(ChronoUnit.values()).map(ChronoUnit::name).collect(Collectors.toList()); + List validOptions = Arrays.stream(ChronoUnit.values()).map(ChronoUnit::name).collect(Collectors.toList()); - @Override - public void ensureValid(String s, Object o) { - if (o instanceof String) { - if (!validOptions.contains(o)) { - throw new ConfigException( - s, - String.format( - "'%s' is not a valid value for %s. Valid values are %s.", - o, - ChronoUnit.class.getSimpleName(), - Joiner.on(", ").join(validOptions) - ) - ); - } - } else if (o instanceof List) { - List list = (List) o; - for (Object i : list) { - ensureValid(s, i); - } - } else { - throw new ConfigException( - s, - o, - "Must be a String or List" - ); - } + @Override + public void ensureValid(String s, Object o) { + if (o instanceof String) { + if (!validOptions.contains(o)) { + throw new ConfigException( + s, + String.format( + "'%s' is not a valid value for %s. Valid values are %s.", + o, + ChronoUnit.class.getSimpleName(), + Joiner.on(", ").join(validOptions) + ) + ); + } + } else if (o instanceof List) { + List list = (List) o; + for (Object i : list) { + ensureValid(s, i); + } + } else { + throw new ConfigException( + s, + o, + "Must be a String or List" + ); } + } - @Override - public String toString() { - return "Matches: ``" + Joiner.on("``, ``").join(this.validOptions) + "``"; - } + @Override + public String toString() { + return "Matches: ``" + Joiner.on("``, ``").join(this.validOptions) + "``"; + } } diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTest.java index 4a6c84d..af8c8af 100644 --- a/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTest.java +++ b/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTest.java @@ -289,7 +289,7 @@ public void mapFormattedAsUnix() { this.transformation.configure( ImmutableMap.of( TimestampNowFieldConfig.FIELDS_CONF, "timestamp", - TimestampNowFieldConfig.TARGET_TYPE_CONF, "Unix" + TimestampNowFieldConfig.TARGET_TYPE_CONF, "UNIX" ) ); final Map expected = ImmutableMap.of( From 4ec40c24080fd9bdb9db2da0090922b011ac11d4 Mon Sep 17 00:00:00 2001 From: "jeffrey.vanderlaan" Date: Fri, 2 Jun 2023 12:02:08 +0200 Subject: [PATCH 5/6] Actually add the wanted number of ChronoUnits --- .../transform/common/TimestampNowField.java | 2 +- .../common/TimestampNowFieldTest.java | 31 +++++++++---------- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowField.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowField.java index 170935c..7326dda 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowField.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowField.java @@ -95,7 +95,7 @@ protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct inpu private Object getFormattedTimestamp() { long desiredTimeInMillis = this.time.milliseconds(); if (config.addAmount > 0) { - desiredTimeInMillis += config.addChronoUnit.getDuration().toMillis(); + desiredTimeInMillis += config.addAmount * config.addChronoUnit.getDuration().toMillis(); } return this.config.targetType.getFormattedTimestamp(desiredTimeInMillis); } diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTest.java index af8c8af..aeff35b 100644 --- a/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTest.java +++ b/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTest.java @@ -12,7 +12,6 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; -import java.time.temporal.ChronoUnit; import java.util.Date; import java.util.Map; @@ -24,13 +23,13 @@ public class TimestampNowFieldTest { TimestampNowField transformation; - static Date timestamp = new Date(1586963336123L); + static Date timestampNow = new Date(1586963336123L); @BeforeEach public void beforeEach() { this.transformation = new TimestampNowField.Value<>(); Time time = mock(Time.class); - when(time.milliseconds()).thenReturn(timestamp.getTime()); + when(time.milliseconds()).thenReturn(timestampNow.getTime()); this.transformation.time = time; this.transformation.configure( ImmutableMap.of(TimestampNowFieldConfig.FIELDS_CONF, "timestamp") @@ -43,8 +42,8 @@ public void afterEach() { } enum StructUseCases { - Date(TimestampNowFieldTargetType.DATE, Timestamp.SCHEMA, timestamp), - Unix(TimestampNowFieldTargetType.UNIX, Schema.INT64_SCHEMA, timestamp.toInstant().getEpochSecond()); + Date(TimestampNowFieldTargetType.DATE, Timestamp.SCHEMA, timestampNow), + Unix(TimestampNowFieldTargetType.UNIX, Schema.INT64_SCHEMA, timestampNow.toInstant().getEpochSecond()); public final TimestampNowFieldTargetType targetType; public final Schema schema; @@ -146,12 +145,12 @@ public void structFieldExists(StructUseCases useCase) { } @Test - public void structAddOneDay() { - Date timestampPlusOneDay = Date.from(timestamp.toInstant().plus(1, ChronoUnit.DAYS)); + public void structAddTwoDays() { + Date timestampNowPlusTwoDays = new Date(1587136136123L); this.transformation.configure( ImmutableMap.of( TimestampNowFieldConfig.FIELDS_CONF, "timestamp", - TimestampNowFieldConfig.ADD_AMOUNT_CONF, "1", + TimestampNowFieldConfig.ADD_AMOUNT_CONF, "2", TimestampNowFieldConfig.ADD_CHRONO_UNIT_CONF, "DAYS" ) ); @@ -172,7 +171,7 @@ public void structAddOneDay() { final Struct expectedStruct = new Struct(expectedSchema) .put("firstName", "example") .put("lastName", "user") - .put("timestamp", timestampPlusOneDay); + .put("timestamp", timestampNowPlusTwoDays); final SinkRecord input = new SinkRecord( "test", 1, @@ -236,7 +235,7 @@ public void structFieldMismatch(StructUseCases useCase) { @Test public void mapFieldMissing() { final Map expected = ImmutableMap.of( - "firstName", "example", "lastName", "user", "timestamp", timestamp + "firstName", "example", "lastName", "user", "timestamp", timestampNow ); final SinkRecord input = new SinkRecord( "test", @@ -255,17 +254,17 @@ public void mapFieldMissing() { } @Test - public void mapAddOneDay() { - Date timestampPlusOneDay = Date.from(timestamp.toInstant().plus(1, ChronoUnit.DAYS)); + public void mapAddThreeHours() { + Date timestampPlusThreeHours = new Date(1586974136123L); this.transformation.configure( ImmutableMap.of( TimestampNowFieldConfig.FIELDS_CONF, "timestamp", - TimestampNowFieldConfig.ADD_AMOUNT_CONF, "1", - TimestampNowFieldConfig.ADD_CHRONO_UNIT_CONF, "DAYS" + TimestampNowFieldConfig.ADD_AMOUNT_CONF, "3", + TimestampNowFieldConfig.ADD_CHRONO_UNIT_CONF, "HOURS" ) ); final Map expected = ImmutableMap.of( - "firstName", "example", "lastName", "user", "timestamp", timestampPlusOneDay + "firstName", "example", "lastName", "user", "timestamp", timestampPlusThreeHours ); final SinkRecord input = new SinkRecord( "test", @@ -285,7 +284,7 @@ public void mapAddOneDay() { @Test public void mapFormattedAsUnix() { - long timestampFormattedAsUnix = timestamp.toInstant().getEpochSecond(); + long timestampFormattedAsUnix = timestampNow.toInstant().getEpochSecond(); this.transformation.configure( ImmutableMap.of( TimestampNowFieldConfig.FIELDS_CONF, "timestamp", From 9e51b79e5db15a568a1336c930cad8cc8c8606f5 Mon Sep 17 00:00:00 2001 From: "jeffrey.vanderlaan" Date: Fri, 2 Jun 2023 12:23:06 +0200 Subject: [PATCH 6/6] Make the timestamps human readable --- .../common/TimestampNowFieldTest.java | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTest.java index aeff35b..b442b8a 100644 --- a/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTest.java +++ b/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/TimestampNowFieldTest.java @@ -12,6 +12,8 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.Date; import java.util.Map; @@ -23,13 +25,14 @@ public class TimestampNowFieldTest { TimestampNowField transformation; - static Date timestampNow = new Date(1586963336123L); + static Instant timestampNow = Instant.parse("2023-06-02T10:06:45+00:00"); + static Date timestampNowAsDate = Date.from(timestampNow); @BeforeEach public void beforeEach() { this.transformation = new TimestampNowField.Value<>(); Time time = mock(Time.class); - when(time.milliseconds()).thenReturn(timestampNow.getTime()); + when(time.milliseconds()).thenReturn(timestampNow.toEpochMilli()); this.transformation.time = time; this.transformation.configure( ImmutableMap.of(TimestampNowFieldConfig.FIELDS_CONF, "timestamp") @@ -42,8 +45,8 @@ public void afterEach() { } enum StructUseCases { - Date(TimestampNowFieldTargetType.DATE, Timestamp.SCHEMA, timestampNow), - Unix(TimestampNowFieldTargetType.UNIX, Schema.INT64_SCHEMA, timestampNow.toInstant().getEpochSecond()); + DATE(TimestampNowFieldTargetType.DATE, Timestamp.SCHEMA, timestampNowAsDate), + UNIX(TimestampNowFieldTargetType.UNIX, Schema.INT64_SCHEMA, timestampNow.getEpochSecond()); public final TimestampNowFieldTargetType targetType; public final Schema schema; @@ -146,7 +149,7 @@ public void structFieldExists(StructUseCases useCase) { @Test public void structAddTwoDays() { - Date timestampNowPlusTwoDays = new Date(1587136136123L); + Date timestampNowPlusTwoDays = Date.from(timestampNow.plus(2, ChronoUnit.DAYS)); this.transformation.configure( ImmutableMap.of( TimestampNowFieldConfig.FIELDS_CONF, "timestamp", @@ -235,7 +238,7 @@ public void structFieldMismatch(StructUseCases useCase) { @Test public void mapFieldMissing() { final Map expected = ImmutableMap.of( - "firstName", "example", "lastName", "user", "timestamp", timestampNow + "firstName", "example", "lastName", "user", "timestamp", timestampNowAsDate ); final SinkRecord input = new SinkRecord( "test", @@ -255,7 +258,7 @@ public void mapFieldMissing() { @Test public void mapAddThreeHours() { - Date timestampPlusThreeHours = new Date(1586974136123L); + Date timestampPlusThreeHours = Date.from(timestampNow.plus(3, ChronoUnit.HOURS)); this.transformation.configure( ImmutableMap.of( TimestampNowFieldConfig.FIELDS_CONF, "timestamp", @@ -284,7 +287,7 @@ public void mapAddThreeHours() { @Test public void mapFormattedAsUnix() { - long timestampFormattedAsUnix = timestampNow.toInstant().getEpochSecond(); + long timestampFormattedAsUnix = timestampNow.getEpochSecond(); this.transformation.configure( ImmutableMap.of( TimestampNowFieldConfig.FIELDS_CONF, "timestamp",