diff --git a/marmaray/src/main/java/com/uber/marmaray/common/HoodieErrorPayload.java b/marmaray/src/main/java/com/uber/marmaray/common/HoodieErrorPayload.java index 11e9551..bcd1c29 100644 --- a/marmaray/src/main/java/com/uber/marmaray/common/HoodieErrorPayload.java +++ b/marmaray/src/main/java/com/uber/marmaray/common/HoodieErrorPayload.java @@ -16,8 +16,8 @@ */ package com.uber.marmaray.common; -import com.uber.hoodie.common.model.HoodieRecordPayload; -import com.uber.hoodie.common.util.HoodieAvroUtils; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.HoodieAvroUtils; import com.uber.marmaray.common.exceptions.JobRuntimeException; import org.apache.avro.Schema; @@ -25,6 +25,7 @@ import org.apache.avro.generic.IndexedRecord; import lombok.NonNull; +import org.apache.hudi.common.util.Option; import java.io.IOException; import java.util.Optional; @@ -40,13 +41,13 @@ public HoodieErrorPayload(@NonNull final GenericRecord record) { } @Override - public Optional getInsertValue(final Schema schema) throws IOException { - final Optional record = getRecord(); + public Option getInsertValue(final Schema schema) throws IOException { + final Option record = getRecord(); return record.map(r -> HoodieAvroUtils.rewriteRecord(r, schema)); } - protected Optional getRecord() { - return Optional.of(this.record); + protected Option getRecord() { + return Option.of(this.record); } @Override @@ -55,7 +56,7 @@ public HoodieErrorPayload preCombine(final HoodieErrorPayload hoodieErrorPayload } @Override - public Optional combineAndGetUpdateValue(final IndexedRecord indexedRecord, final Schema schema) + public Option combineAndGetUpdateValue(final IndexedRecord indexedRecord, final Schema schema) throws IOException { throw new JobRuntimeException("Not implemented yet!!"); } diff --git a/marmaray/src/main/java/com/uber/marmaray/common/configuration/HoodieConfiguration.java b/marmaray/src/main/java/com/uber/marmaray/common/configuration/HoodieConfiguration.java index 171f228..de788cd 100644 --- a/marmaray/src/main/java/com/uber/marmaray/common/configuration/HoodieConfiguration.java +++ b/marmaray/src/main/java/com/uber/marmaray/common/configuration/HoodieConfiguration.java @@ -17,16 +17,17 @@ package com.uber.marmaray.common.configuration; import com.google.common.base.Optional; -import com.uber.hoodie.WriteStatus; -import com.uber.hoodie.common.model.HoodieCleaningPolicy; -import com.uber.hoodie.common.table.HoodieTableConfig; -import com.uber.hoodie.config.HoodieCompactionConfig; -import com.uber.hoodie.config.HoodieIndexConfig; -import com.uber.hoodie.config.HoodieMetricsConfig; -import com.uber.hoodie.config.HoodieStorageConfig; -import com.uber.hoodie.config.HoodieWriteConfig; +import org.apache.hudi.WriteStatus; +import org.apache.hudi.common.model.HoodieCleaningPolicy; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieMetricsConfig; +import org.apache.hudi.config.HoodieStorageConfig; +import org.apache.hudi.config.HoodieWriteConfig; import com.uber.marmaray.common.exceptions.JobRuntimeException; import com.uber.marmaray.common.exceptions.MissingPropertyException; +import com.uber.marmaray.common.sinks.hoodie.HoodieSink; import com.uber.marmaray.utilities.ConfigUtil; import com.uber.marmaray.utilities.StringTypes; import lombok.Getter; @@ -65,6 +66,22 @@ public class HoodieConfiguration implements Serializable { * Schema for Hoodie dataset */ public static final String HOODIE_AVRO_SCHEMA = HOODIE_COMMON_PROPERTY_PREFIX + "schema"; + + /** + * Record Key for Hoodie dataset + */ + public static final String HOODIE_RECORD_KEY = HOODIE_COMMON_PROPERTY_PREFIX + "record_key"; + + /** + * Partition path for Hoodie dataset + */ + public static final String HOODIE_PARTITION_PATH = HOODIE_COMMON_PROPERTY_PREFIX + "partition_path"; + + /** + * Partition path for Hoodie dataset + */ + public static final String HOODIE_SINK_OP = HOODIE_COMMON_PROPERTY_PREFIX + "sink_op"; + /** * Flag to control whether it should combine before insert */ @@ -250,6 +267,31 @@ public String getTableName() { return this.getConf().getProperty(getTablePropertyKey(HOODIE_TABLE_NAME, this.tableKey)).get(); } + /** + * @return hoodie record key. + */ + public Optional getHoodieRecordKey() { + return this.conf.getProperty(getTablePropertyKey(HOODIE_RECORD_KEY, this.tableKey)); + } + + /** + * @return hoodie partition path. + */ + public Optional getHoodiePartitionPath() { + return this.conf.getProperty(getTablePropertyKey(HOODIE_PARTITION_PATH, this.tableKey)); + } + + /** + * @return hoodie sink operation + */ + public HoodieSink.HoodieSinkOp getHoodieSinkOp() { + Optional sinkOp = this.conf.getProperty(getTablePropertyKey(HOODIE_SINK_OP, this.tableKey)); + if (sinkOp.isPresent()) { + return HoodieSink.HoodieSinkOp.valueOf(sinkOp.get().toUpperCase()); + } + return HoodieSink.HoodieSinkOp.BULK_INSERT; + } + /** * @return hoodie metrics prefix. * */ @@ -263,7 +305,7 @@ public String getHoodieDataPartitioner(@NotEmpty final String defaultDataPartiti } /** - * @return true if {@link com.uber.hoodie.HoodieWriteClient} should rollback inflight commits from previous write + * @return true if {@link org.apache.hudi.HoodieWriteClient} should rollback inflight commits from previous write * call. */ public boolean shouldRollbackInFlight() { @@ -392,6 +434,7 @@ public HoodieWriteConfig getHoodieWriteConfig() { ).build()); // Hoodie index config + builder.withIndexConfig(new HoodieIndexConfiguration(getConf(), getTableKey()).configureHoodieIndex()); // Hoodie metrics config @@ -421,12 +464,6 @@ public HoodieWriteConfig getHoodieWriteConfig() { throw new JobRuntimeException(errorStr, e); } - // enable tmp directory writes for hoodie. - builder.withUseTempFolderCopyOnWriteForCreate(true); - - // enabled the renaming for copy detection on merge - builder.withUseTempFolderCopyOnWriteForMerge(true); - return builder.build(); } catch (IllegalArgumentException e) { throw new MissingPropertyException(e.getMessage(), e); @@ -492,11 +529,26 @@ public Builder withBasePath(@NotEmpty final String basePath) { return this; } + public Builder withRecordKey(@NotEmpty final String recordKey) { + this.conf.setProperty(getTablePropertyKey(HOODIE_RECORD_KEY, this.tableKey), recordKey); + return this; + } + + public Builder withPartitionPath(@NotEmpty final String partitionPath) { + this.conf.setProperty(getTablePropertyKey(HOODIE_PARTITION_PATH, this.tableKey), partitionPath); + return this; + } + public Builder withSchema(@NotEmpty final String schema) { this.conf.setProperty(getTablePropertyKey(HOODIE_AVRO_SCHEMA, this.tableKey), schema); return this; } + public Builder withSinkOp(@NotEmpty final String sinkOp) { + this.conf.setProperty(getTablePropertyKey(HOODIE_SINK_OP, this.tableKey), sinkOp); + return this; + } + public Builder withBulkInsertParallelism(final int parallelism) { this.conf.setProperty( getTablePropertyKey(HOODIE_BULKINSERT_PARALLELISM, this.tableKey), Integer.toString(parallelism)); diff --git a/marmaray/src/main/java/com/uber/marmaray/common/configuration/HoodieIndexConfiguration.java b/marmaray/src/main/java/com/uber/marmaray/common/configuration/HoodieIndexConfiguration.java index 4db0214..b43dbf2 100644 --- a/marmaray/src/main/java/com/uber/marmaray/common/configuration/HoodieIndexConfiguration.java +++ b/marmaray/src/main/java/com/uber/marmaray/common/configuration/HoodieIndexConfiguration.java @@ -18,9 +18,11 @@ package com.uber.marmaray.common.configuration; import com.google.common.base.Preconditions; -import com.uber.hoodie.config.HoodieIndexConfig; -import com.uber.hoodie.index.HoodieIndex; -import com.uber.hoodie.index.HoodieIndex.IndexType; +import org.apache.hudi.config.DefaultHoodieConfig; +import org.apache.hudi.config.HoodieHBaseIndexConfig; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.index.HoodieIndex.IndexType; import com.uber.marmaray.common.exceptions.JobRuntimeException; import com.uber.marmaray.utilities.StringTypes; import lombok.Getter; @@ -165,23 +167,26 @@ public HoodieIndexConfig configureHoodieIndex() { version = DEFAULT_VERSION; } final String topicName = getTableName(); + final HoodieIndexConfig.Builder builder = HoodieIndexConfig.newBuilder() .withIndexType(getHoodieIndexType()); - if (HoodieIndex.IndexType.HBASE.equals(getHoodieIndexType())) { + final HoodieHBaseIndexConfig.Builder hoodieHBaseIndexConfigBuilder = HoodieHBaseIndexConfig.newBuilder(); final String quorum = getHoodieIndexZookeeperQuorum(); final Integer port = getHoodieIndexZookeeperPort(); final String zkZnodeParent = getZkZnodeParent(); createHbaseIndexTableIfNotExists(topicName, quorum, port.toString(), zkZnodeParent, version); - builder + hoodieHBaseIndexConfigBuilder .hbaseIndexGetBatchSize(getHoodieIndexGetBatchSize()) .hbaseTableName(getHoodieHbaseIndexTableName()) .hbaseZkPort(port) .hbaseZkQuorum(quorum); + final HoodieHBaseIndexConfig hoodieHBaseIndexConfig = hoodieHBaseIndexConfigBuilder.build(); + builder.withHBaseIndexConfig(hoodieHBaseIndexConfig); } - return builder.build(); + return builder.build(); } public void createHbaseIndexTableIfNotExists(@NotEmpty final String dataFeed, @NotEmpty final String zkQuorum, diff --git a/marmaray/src/main/java/com/uber/marmaray/common/converters/data/DummyHoodieSinkDataConverter.java b/marmaray/src/main/java/com/uber/marmaray/common/converters/data/DummyHoodieSinkDataConverter.java index 54224e5..d93d36b 100644 --- a/marmaray/src/main/java/com/uber/marmaray/common/converters/data/DummyHoodieSinkDataConverter.java +++ b/marmaray/src/main/java/com/uber/marmaray/common/converters/data/DummyHoodieSinkDataConverter.java @@ -19,6 +19,7 @@ import com.uber.marmaray.common.AvroPayload; import com.uber.marmaray.common.configuration.Configuration; +import com.uber.marmaray.common.configuration.HoodieConfiguration; import com.uber.marmaray.utilities.ErrorExtractor; import lombok.NonNull; @@ -29,7 +30,9 @@ */ public class DummyHoodieSinkDataConverter extends HoodieSinkDataConverter { public DummyHoodieSinkDataConverter() { - super(new Configuration(), new ErrorExtractor()); + + super(new Configuration(), new ErrorExtractor(), HoodieConfiguration.newBuilder(new Configuration(), + "test").build()); } @Override diff --git a/marmaray/src/main/java/com/uber/marmaray/common/converters/data/HoodieSinkDataConverter.java b/marmaray/src/main/java/com/uber/marmaray/common/converters/data/HoodieSinkDataConverter.java index 265cedd..8714ef6 100644 --- a/marmaray/src/main/java/com/uber/marmaray/common/converters/data/HoodieSinkDataConverter.java +++ b/marmaray/src/main/java/com/uber/marmaray/common/converters/data/HoodieSinkDataConverter.java @@ -16,13 +16,15 @@ */ package com.uber.marmaray.common.converters.data; -import com.uber.hoodie.common.model.HoodieAvroPayload; -import com.uber.hoodie.common.model.HoodieKey; -import com.uber.hoodie.common.model.HoodieRecord; -import com.uber.hoodie.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; import com.uber.marmaray.common.AvroPayload; import com.uber.marmaray.common.configuration.Configuration; +import com.uber.marmaray.common.configuration.HoodieConfiguration; import com.uber.marmaray.common.converters.converterresult.ConverterResult; +import com.uber.marmaray.common.exceptions.InvalidDataException; import com.uber.marmaray.common.metrics.DataFeedMetrics; import com.uber.marmaray.common.metrics.JobMetrics; import com.uber.marmaray.common.sinks.hoodie.HoodieSink; @@ -35,27 +37,35 @@ import java.util.Collections; import java.util.List; +import com.google.common.base.Optional; +import org.apache.hudi.common.util.Option; + /** * {@link HoodieSinkDataConverter} extends {@link SinkDataConverter} - * This class is used by {@link HoodieSink} to generate {@link com.uber.hoodie.common.model.HoodieRecord} from + * This class is used by {@link HoodieSink} to generate {@link org.apache.hudi.common.model.HoodieRecord} from * {@link com.uber.marmaray.common.AvroPayload}. */ -public abstract class HoodieSinkDataConverter extends SinkDataConverter> { +public class HoodieSinkDataConverter extends SinkDataConverter> { // store the schema as a string since Schema doesn't serialize. Used in extended classes. protected String schema; private final ErrorExtractor errorExtractor; + private final HoodieConfiguration hoodieConfiguration; - public HoodieSinkDataConverter(@NonNull final Configuration conf, @NonNull final ErrorExtractor errorExtractor) { + public HoodieSinkDataConverter(@NonNull final Configuration conf, @NonNull final ErrorExtractor errorExtractor, + @NonNull final HoodieConfiguration hoodieConfiguration) { super(conf, errorExtractor); this.errorExtractor = errorExtractor; + this.hoodieConfiguration = hoodieConfiguration; } public HoodieSinkDataConverter(@NonNull final Configuration conf, final String schema, - @NonNull final ErrorExtractor errorExtractor) { + @NonNull final ErrorExtractor errorExtractor, + HoodieConfiguration hoodieConfiguration) { super(conf, errorExtractor); this.schema = schema; this.errorExtractor = errorExtractor; + this.hoodieConfiguration = hoodieConfiguration; } @Override @@ -82,7 +92,17 @@ protected final List hoodieRecordKey = hoodieConfiguration.getHoodieRecordKey(); + if (hoodieRecordKey.isPresent()) { + final Object recordKeyFieldVal = payload.getData().get(hoodieRecordKey.get()); + if (recordKeyFieldVal == null) { + throw new InvalidDataException("required field is missing:" + hoodieRecordKey.get()); + } + return recordKeyFieldVal.toString(); + } + throw new Exception("Hoodie Record Key missing"); + } /** * The implementation of it should use fields from {@link AvroPayload} to generate partition path which is needed @@ -90,9 +110,19 @@ protected final List hoodiePartitionPath = hoodieConfiguration.getHoodiePartitionPath(); + if (hoodiePartitionPath.isPresent()) { + final Object partitionFieldVal = payload.getData().get(hoodiePartitionPath.get()); + if (partitionFieldVal == null) { + throw new InvalidDataException("required field is missing:" + hoodiePartitionPath.get()); + } + return partitionFieldVal.toString(); + } + throw new Exception("Hoodie Partition Path missing"); + } protected HoodieRecordPayload getPayload(@NonNull final AvroPayload payload) { - return new HoodieAvroPayload(java.util.Optional.of(payload.getData())); + return new HoodieAvroPayload(Option.of(payload.getData())); } } diff --git a/marmaray/src/main/java/com/uber/marmaray/common/converters/data/TSBasedHoodieSinkDataConverter.java b/marmaray/src/main/java/com/uber/marmaray/common/converters/data/TSBasedHoodieSinkDataConverter.java index 44bea41..222b5ac 100644 --- a/marmaray/src/main/java/com/uber/marmaray/common/converters/data/TSBasedHoodieSinkDataConverter.java +++ b/marmaray/src/main/java/com/uber/marmaray/common/converters/data/TSBasedHoodieSinkDataConverter.java @@ -18,8 +18,9 @@ import com.uber.marmaray.common.AvroPayload; import com.uber.marmaray.common.configuration.Configuration; +import com.uber.marmaray.common.configuration.HoodieConfiguration; import com.uber.marmaray.common.exceptions.InvalidDataException; -import com.uber.hoodie.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieKey; import com.uber.marmaray.utilities.HoodieSinkConverterErrorExtractor; import java.text.SimpleDateFormat; import java.util.Date; @@ -33,47 +34,29 @@ /** * {@link TSBasedHoodieSinkDataConverter} extends {@link HoodieSinkDataConverter} * - * This class generates {@link HoodieKey} from given {@link AvroPayload}. The passed in {@link AvroPayload} requires - * {@link #partitionFieldName} with timestamp in {@link #timeUnit}. + * This class generates partition path from given {@link AvroPayload}. The passed in {@link AvroPayload} requires + * {@link HoodieConfiguration} with timestamp in {@link #timeUnit}. * - * {@link AvroPayload} also requires a {@link #recordKeyFieldName} which should be the primary key for the record. */ @Slf4j public class TSBasedHoodieSinkDataConverter extends HoodieSinkDataConverter { public static final SimpleDateFormat PARTITION_FORMATTER = new SimpleDateFormat(DATE_PARTITION_FORMAT); - @NotEmpty - private final String recordKeyFieldName; - @NotEmpty - private final String partitionFieldName; + @NonNull private final TimeUnit timeUnit; public TSBasedHoodieSinkDataConverter(@NonNull final Configuration conf, - @NotEmpty final String recordKeyFieldName, @NotEmpty final String partitionFieldName, - @NonNull final TimeUnit timeUnit) { - super(conf, new HoodieSinkConverterErrorExtractor()); - this.recordKeyFieldName = recordKeyFieldName; - this.partitionFieldName = partitionFieldName; + @NonNull final HoodieConfiguration hoodieConfiguration, + @NonNull final TimeUnit timeUnit) { + super(conf, new HoodieSinkConverterErrorExtractor(), hoodieConfiguration); this.timeUnit = timeUnit; } - @Override - protected String getRecordKey(@NonNull final AvroPayload payload) throws Exception { - final Object recordKeyFieldVal = payload.getField(recordKeyFieldName); - if (recordKeyFieldVal == null) { - throw new InvalidDataException("required field is missing:" + recordKeyFieldName); - } - return recordKeyFieldVal.toString(); - } - @Override protected String getPartitionPath(final AvroPayload payload) throws Exception { - final Object partitionFieldVal = payload.getField(partitionFieldName); - if (partitionFieldVal == null) { - throw new InvalidDataException("required field is missing:" + partitionFieldName); - } - final Date date = new Date(this.timeUnit.toMillis((long) Double.parseDouble(partitionFieldVal.toString()))); + String partitionFieldVal = super.getPartitionPath(payload); + final Date date = new Date(this.timeUnit.toMillis((long) Double.parseDouble(partitionFieldVal))); return PARTITION_FORMATTER.format(date); } } diff --git a/marmaray/src/main/java/com/uber/marmaray/common/metadata/HoodieBasedMetadataManager.java b/marmaray/src/main/java/com/uber/marmaray/common/metadata/HoodieBasedMetadataManager.java index d65f8a6..4820a89 100644 --- a/marmaray/src/main/java/com/uber/marmaray/common/metadata/HoodieBasedMetadataManager.java +++ b/marmaray/src/main/java/com/uber/marmaray/common/metadata/HoodieBasedMetadataManager.java @@ -17,13 +17,13 @@ package com.uber.marmaray.common.metadata; import com.google.common.base.Optional; -import com.uber.hoodie.HoodieWriteClient; -import com.uber.hoodie.WriteStatus; -import com.uber.hoodie.common.model.HoodieAvroPayload; -import com.uber.hoodie.common.model.HoodieCommitMetadata; -import com.uber.hoodie.common.table.HoodieTableMetaClient; -import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; -import com.uber.hoodie.common.table.timeline.HoodieInstant; +import org.apache.hudi.HoodieWriteClient; +import org.apache.hudi.WriteStatus; +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; import com.uber.marmaray.common.configuration.HadoopConfiguration; import com.uber.marmaray.common.configuration.HoodieConfiguration; import com.uber.marmaray.common.exceptions.JobRuntimeException; @@ -36,6 +36,7 @@ import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.fs.FileSystem; +import org.apache.hudi.common.util.Option; import org.apache.parquet.Strings; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -61,6 +62,7 @@ public class HoodieBasedMetadataManager implements IMetadataManager @Getter private final HoodieConfiguration hoodieConf; + private final HadoopConfiguration hadoopConf; private final AtomicBoolean saveChanges; private transient Optional jsc = Optional.absent(); private Optional> metadataMap = Optional.absent(); @@ -72,8 +74,11 @@ public class HoodieBasedMetadataManager implements IMetadataManager * @param hoodieConf {@link HoodieConfiguration} */ public HoodieBasedMetadataManager(@NonNull final HoodieConfiguration hoodieConf, - @NonNull final AtomicBoolean shouldSaveChanges, @NonNull final JavaSparkContext jsc) throws IOException { + @NonNull final HadoopConfiguration hadoopConf, + @NonNull final AtomicBoolean shouldSaveChanges, + @NonNull final JavaSparkContext jsc) throws IOException { this.hoodieConf = hoodieConf; + this.hadoopConf = hadoopConf; this.saveChanges = shouldSaveChanges; this.jsc = Optional.of(jsc); } @@ -90,7 +95,7 @@ public void setJobMetrics(@NonNull final JobMetrics jobMetrics) { private Map getMetadataMap() { if (!this.metadataMap.isPresent()) { - this.metadataMap = Optional.of(readMetadataInfo(this.hoodieConf)); + this.metadataMap = Optional.of(readMetadataInfo(this.hoodieConf, this.hadoopConf)); } return this.metadataMap.get(); } @@ -161,7 +166,7 @@ public void saveChanges() { final List dummyWrites = new ArrayList<>(); final boolean ret = writeClient - .commit(commitTime, jsc.get().parallelize(dummyWrites), java.util.Optional.of(getMetadataInfo())); + .commit(commitTime, jsc.get().parallelize(dummyWrites), Option.of(getMetadataInfo())); if (!ret) { throw new JobRuntimeException("Failed to save metadata information."); } @@ -171,7 +176,7 @@ public void saveChanges() { * This method will also be used by HoodieSink to retrieve and store metadata information. * It returns {@link HashMap} with hoodie metadata information to be saved into commit file. * It returns {@link HashMap} instead of {@link Map} because hoodie needs it that way. Checkout - * {@link HoodieWriteClient#commit(String, JavaRDD, java.util.Optional)} for more info. + * {@link HoodieWriteClient#commit(String, JavaRDD, Option)} for more info. */ public HashMap getMetadataInfo() { final HashMap map = new HashMap<>(); @@ -184,20 +189,21 @@ public HashMap getMetadataInfo() { * {@link #HOODIE_METADATA_KEY} key. */ private static Map readMetadataInfo( - @NonNull final HoodieConfiguration hoodieConf) { + @NonNull final HoodieConfiguration hoodieConf, @NonNull final HadoopConfiguration hadoopConf) { try { final FileSystem fs = FSUtils.getFs(hoodieConf.getConf(), Optional.of(hoodieConf.getBasePath())); - HoodieUtil.initHoodieDataset(fs, hoodieConf); + HoodieUtil.initHoodieDataset(fs, hadoopConf, hoodieConf); final HoodieTableMetaClient hoodieTableMetaClient = new HoodieTableMetaClient(new HadoopConfiguration(hoodieConf.getConf()).getHadoopConf(), hoodieConf.getBasePath(), true); final HoodieActiveTimeline hoodieActiveTimeline = hoodieTableMetaClient.getActiveTimeline(); - final java.util.Optional lastInstant = hoodieActiveTimeline.getCommitTimeline() + final Option lastInstant = hoodieActiveTimeline.getCommitTimeline() .filterCompletedInstants().lastInstant(); if (lastInstant.isPresent()) { log.info("using hoodie instant for reading checkpoint info :{}", lastInstant.get().getTimestamp()); final HoodieCommitMetadata commitMetadata = - HoodieCommitMetadata.fromBytes(hoodieActiveTimeline.getInstantDetails(lastInstant.get()).get()); + HoodieCommitMetadata.fromBytes(hoodieActiveTimeline.getInstantDetails(lastInstant.get()).get(), + HoodieCommitMetadata.class); final String serCommitInfo = commitMetadata.getMetadata(HOODIE_METADATA_KEY); if (!Strings.isNullOrEmpty(serCommitInfo)) { return MapUtil.deserializeMap(serCommitInfo); diff --git a/marmaray/src/main/java/com/uber/marmaray/common/schema/kafka/AbstractKafkaSchemaServiceReader.java b/marmaray/src/main/java/com/uber/marmaray/common/schema/kafka/AbstractKafkaSchemaServiceReader.java new file mode 100644 index 0000000..9e30115 --- /dev/null +++ b/marmaray/src/main/java/com/uber/marmaray/common/schema/kafka/AbstractKafkaSchemaServiceReader.java @@ -0,0 +1,28 @@ +package com.uber.marmaray.common.schema.kafka; + + +import com.uber.marmaray.common.schema.ISchemaService; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.Schema; +import org.hibernate.validator.constraints.NotEmpty; + +import java.io.Serializable; + +@Slf4j +public abstract class AbstractKafkaSchemaServiceReader implements ISchemaService.ISchemaServiceReader, Serializable { + private final String schemaString; + private transient Schema schema; + + AbstractKafkaSchemaServiceReader(@NotEmpty final Schema schema) { + this.schemaString = schema.toString(); + this.schema = schema; + log.info("Kafka Schema service reader initialised with schema {}", schemaString); + } + + Schema getSchema() { + if (this.schema == null) { + this.schema = new Schema.Parser().parse(this.schemaString); + } + return this.schema; + } +} \ No newline at end of file diff --git a/marmaray/src/main/java/com/uber/marmaray/common/schema/kafka/KafkaSchemaAvroServiceReader.java b/marmaray/src/main/java/com/uber/marmaray/common/schema/kafka/KafkaSchemaAvroServiceReader.java new file mode 100644 index 0000000..dd94e9a --- /dev/null +++ b/marmaray/src/main/java/com/uber/marmaray/common/schema/kafka/KafkaSchemaAvroServiceReader.java @@ -0,0 +1,28 @@ +package com.uber.marmaray.common.schema.kafka; + +import com.uber.marmaray.common.exceptions.InvalidDataException; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DecoderFactory; + +import java.io.IOException; + +public class KafkaSchemaAvroServiceReader extends AbstractKafkaSchemaServiceReader { + public KafkaSchemaAvroServiceReader(Schema schema) { + super(schema); + } + + @Override + public GenericRecord read(byte[] buffer) throws InvalidDataException { + final DatumReader datumReader = new GenericDatumReader<>(getSchema()); + BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(buffer, null); + try { + return datumReader.read(null, decoder); + } catch (IOException e) { + throw new InvalidDataException("Error decoding data", e); + } + } +} diff --git a/marmaray/src/main/java/com/uber/marmaray/common/schema/kafka/KafkaSchemaJSONServiceReader.java b/marmaray/src/main/java/com/uber/marmaray/common/schema/kafka/KafkaSchemaJSONServiceReader.java new file mode 100644 index 0000000..65603bb --- /dev/null +++ b/marmaray/src/main/java/com/uber/marmaray/common/schema/kafka/KafkaSchemaJSONServiceReader.java @@ -0,0 +1,28 @@ +package com.uber.marmaray.common.schema.kafka; + +import com.uber.marmaray.common.exceptions.InvalidDataException; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.JsonDecoder; + +import java.io.IOException; + +public class KafkaSchemaJSONServiceReader extends AbstractKafkaSchemaServiceReader { + public KafkaSchemaJSONServiceReader(Schema schema) { + super(schema); + } + + @Override + public GenericRecord read(byte[] buffer) throws InvalidDataException { + final DatumReader datumReader = new GenericDatumReader<>(getSchema()); + try { + JsonDecoder decoder = DecoderFactory.get().jsonDecoder(getSchema(), new String(buffer)); + return datumReader.read(null, decoder); + } catch (IOException e) { + throw new InvalidDataException("Error decoding data", e); + } + } +} diff --git a/marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/HoodieErrorSink.java b/marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/HoodieErrorSink.java index 7dc97b5..8f0122d 100644 --- a/marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/HoodieErrorSink.java +++ b/marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/HoodieErrorSink.java @@ -17,10 +17,11 @@ package com.uber.marmaray.common.sinks.hoodie; import com.google.common.base.Optional; -import com.uber.hoodie.WriteStatus; -import com.uber.hoodie.common.model.HoodieKey; -import com.uber.hoodie.exception.HoodieInsertException; -import com.uber.hoodie.exception.HoodieUpsertException; +import org.apache.hudi.WriteStatus; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.exception.HoodieInsertException; +import org.apache.hudi.exception.HoodieUpsertException; +import com.uber.marmaray.common.configuration.HadoopConfiguration; import com.uber.marmaray.common.configuration.HoodieConfiguration; import com.uber.marmaray.common.converters.data.HoodieSinkDataConverter; import com.uber.marmaray.common.exceptions.JobRuntimeException; @@ -42,12 +43,12 @@ public class HoodieErrorSink extends HoodieSink { public HoodieErrorSink(@NonNull final HoodieConfiguration hoodieConf, + @NonNull final HadoopConfiguration hadoopConf, @NonNull final HoodieSinkDataConverter hoodieSinkDataConverter, @NonNull final JavaSparkContext jsc, - @NonNull final HoodieSinkOp op, @NonNull final IMetadataManager metadataMgr, final boolean shouldSaveChangesInFuture) { - super(hoodieConf, hoodieSinkDataConverter, jsc, op, metadataMgr, shouldSaveChangesInFuture, Optional.absent()); + super(hoodieConf, hadoopConf, hoodieSinkDataConverter, jsc, metadataMgr, shouldSaveChangesInFuture, Optional.absent()); } public void writeRecordsAndErrors(@NonNull final HoodieWriteResult result) { diff --git a/marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/HoodieSink.java b/marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/HoodieSink.java index 2d6316a..5935a17 100644 --- a/marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/HoodieSink.java +++ b/marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/HoodieSink.java @@ -18,15 +18,17 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; -import com.uber.hoodie.HoodieWriteClient; -import com.uber.hoodie.WriteStatus; -import com.uber.hoodie.common.model.HoodieRecord; -import com.uber.hoodie.common.model.HoodieRecordPayload; -import com.uber.hoodie.config.HoodieWriteConfig; -import com.uber.hoodie.exception.HoodieInsertException; -import com.uber.hoodie.exception.HoodieUpsertException; -import com.uber.hoodie.table.UserDefinedBulkInsertPartitioner; +import org.apache.hudi.HoodieWriteClient; +import org.apache.hudi.WriteStatus; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieInsertException; +import org.apache.hudi.exception.HoodieUpsertException; +import org.apache.hudi.table.UserDefinedBulkInsertPartitioner; import com.uber.marmaray.common.AvroPayload; +import com.uber.marmaray.common.configuration.HadoopConfiguration; import com.uber.marmaray.common.configuration.HoodieConfiguration; import com.uber.marmaray.common.converters.data.HoodieSinkDataConverter; import com.uber.marmaray.common.data.ErrorData; @@ -56,7 +58,6 @@ import org.apache.spark.storage.StorageLevel; import org.apache.spark.util.LongAccumulator; import org.hibernate.validator.constraints.NotEmpty; -import scala.Option; import scala.Tuple2; import java.io.Closeable; @@ -77,6 +78,7 @@ public class HoodieSink implements ISink, scala.Serializable { private static final String TABLE_NAME = "table_name"; private final HoodieConfiguration hoodieConf; + private final HadoopConfiguration hadoopConf; // It is used for generating HoodieKey from AvroPayload. private final HoodieSinkDataConverter hoodieSinkDataConverter; private final transient JavaSparkContext jsc; @@ -114,25 +116,26 @@ public class HoodieSink implements ISink, scala.Serializable { private HoodieSinkOperations hoodieSinkOperations = new HoodieSinkOperations(); public HoodieSink(@NonNull final HoodieConfiguration hoodieConf, + @NonNull final HadoopConfiguration hadoopConf, @NonNull final HoodieSinkDataConverter hoodieSinkDataConverter, @NonNull final JavaSparkContext jsc, - @NonNull final HoodieSinkOp op, @NonNull final IMetadataManager metadataMgr, @NonNull final Optional defaultDataPartitioner) { - this(hoodieConf, hoodieSinkDataConverter, jsc, op, metadataMgr, false, defaultDataPartitioner); + this(hoodieConf, hadoopConf, hoodieSinkDataConverter, jsc, metadataMgr, false, defaultDataPartitioner); } public HoodieSink(@NonNull final HoodieConfiguration hoodieConf, + @NonNull final HadoopConfiguration hadoopConf, @NonNull final HoodieSinkDataConverter hoodieSinkDataConverter, @NonNull final JavaSparkContext jsc, - @NonNull final HoodieSinkOp op, @NonNull final IMetadataManager metadataMgr, final boolean shouldSaveChangesInFuture, @NonNull final Optional defaultDataPartitioner) { this.hoodieConf = hoodieConf; + this.hadoopConf = hadoopConf; this.hoodieSinkDataConverter = hoodieSinkDataConverter; this.jsc = jsc; - this.op = op; + this.op = hoodieConf.getHoodieSinkOp(); this.metadataMgr = metadataMgr; this.sinkStatMgr = new SinkStatManager(this.hoodieConf.getTableName(), this.metadataMgr); this.sinkStatMgr.init(); @@ -175,7 +178,7 @@ public void write(@NonNull final RDDWrapper> h final HoodieWriteConfig hoodieWriteConfig = this.hoodieConf.getHoodieWriteConfig(); try (final HoodieWriteClientWrapper hoodieWriteClient = getHoodieWriteClient(hoodieWriteConfig)) { final String commitTime = - this.commitTime.isPresent() ? this.commitTime.get() : hoodieWriteClient.startCommit(); + this.commitTime.isPresent() ? this.commitTime.get() : hoodieWriteClient.startCommit(); // Handle writes to hoodie. It can be an insert or upsert. final HoodieWriteResult result = handleWrite(hoodieWriteClient, hoodieRecords.getData(), commitTime, op); @@ -191,7 +194,7 @@ public void write(@NonNull final RDDWrapper> h protected void initDataset() { try { HoodieUtil.initHoodieDataset(FSUtils.getFs(this.hoodieConf.getConf(), - Optional.of(this.hoodieConf.getBasePath())), this.hoodieConf); + Optional.of(this.hoodieConf.getBasePath())), this.hadoopConf, this.hoodieConf); } catch (IOException e) { log.error("Error initializing hoodie dataset.", e); throw new JobRuntimeException("Could not initialize hoodie dataset", e); @@ -202,6 +205,7 @@ protected void initDataset() { * If {@link HoodieConfiguration#HOODIE_AUTO_TUNE_PARALLELISM} is enabled then it will use * {@link HoodieConfiguration#HOODIE_TARGET_FILE_SIZE} and {@link SinkStatManager#getAvgRecordSize()} to figure * out what should be the optimal insert parallelism. + * * @param numRecords */ public boolean updateInsertParallelism(final long numRecords) { @@ -209,7 +213,7 @@ public boolean updateInsertParallelism(final long numRecords) { final int newParallelism = calculateNewBulkInsertParallelism(numRecords); if (0 < newParallelism) { this.hoodieConf.setTableProperty(HoodieConfiguration.HOODIE_INSERT_PARALLELISM, - Integer.toString(newParallelism)); + Integer.toString(newParallelism)); log.info("new hoodie insert parallelism is set to :{}", newParallelism); return true; } @@ -221,6 +225,7 @@ public boolean updateInsertParallelism(final long numRecords) { * If {@link HoodieConfiguration#HOODIE_AUTO_TUNE_PARALLELISM} is enabled then it will use * {@link HoodieConfiguration#HOODIE_TARGET_FILE_SIZE} and {@link SinkStatManager#getAvgRecordSize()} to figure * out what should be the optimal bulk insert parallelism. + * * @param numRecords */ public boolean updateBulkInsertParallelism(final long numRecords) { @@ -228,7 +233,7 @@ public boolean updateBulkInsertParallelism(final long numRecords) { final int newParallelism = calculateNewBulkInsertParallelism(numRecords); if (0 < newParallelism) { this.hoodieConf.setTableProperty(HoodieConfiguration.HOODIE_BULKINSERT_PARALLELISM, - Integer.toString(newParallelism)); + Integer.toString(newParallelism)); log.info("new hoodie bulk insert parallelism is set to :{}", newParallelism); return true; } @@ -244,7 +249,7 @@ protected int calculateNewBulkInsertParallelism(final long numRecords) { final int currentParallelism = this.hoodieConf.getBulkInsertParallelism(); log.info( "StatsManager:targetFileSize:{}:avgRecordSize:{}:numRecords:{}:" - + "newBulkInsertParallelism:{}:currentBulkInsertParallelism:{}", + + "newBulkInsertParallelism:{}:currentBulkInsertParallelism:{}", targetFileSize, avgRecordSize, numRecords, newParallelism, currentParallelism); return newParallelism; } @@ -252,8 +257,8 @@ protected int calculateNewBulkInsertParallelism(final long numRecords) { @VisibleForTesting protected HoodieWriteClientWrapper getHoodieWriteClient(@NonNull final HoodieWriteConfig hoodieWriteConfig) { final HoodieWriteClient hoodieWriteClient = - new HoodieWriteClient(this.jsc, hoodieWriteConfig, - this.hoodieConf.shouldRollbackInFlight()); + new HoodieWriteClient(this.jsc, hoodieWriteConfig, + this.hoodieConf.shouldRollbackInFlight()); return new HoodieWriteClientWrapper(hoodieWriteClient, this.bulkInsertPartitioner); } @@ -262,24 +267,24 @@ protected HoodieWriteClientWrapper getHoodieWriteClient(@NonNull final HoodieWri * {@link HoodieBasedMetadataManager#shouldSaveChanges()} flag. */ public void commit(@NonNull final HoodieWriteClientWrapper hoodieWriteClient, - @NotEmpty final String commitTime, - @NonNull final Optional> writesStatuses) { + @NotEmpty final String commitTime, + @NonNull final Optional> writesStatuses) { this.commit(hoodieWriteClient, commitTime, writesStatuses, this.shouldSaveChangesInFuture); } public void commit(@NonNull final HoodieWriteClientWrapper hoodieWriteClient, - @NotEmpty final String commitTime, - @NonNull final Optional> writesStatuses, - final boolean shouldSaveChangesInFuture) { + @NotEmpty final String commitTime, + @NonNull final Optional> writesStatuses, + final boolean shouldSaveChangesInFuture) { updateSinkStat(writesStatuses); logWriteMetrics(writesStatuses); - java.util.Optional> hoodieExtraMetadata = java.util.Optional.empty(); + Option> hoodieExtraMetadata = Option.empty(); if (this.metadataMgr instanceof HoodieBasedMetadataManager) { // Retrieve metadata from metadata manager and update metadata manager to avoid it creating extra // hoodie commit. final HoodieBasedMetadataManager hoodieBasedMetadataManager = (HoodieBasedMetadataManager) this.metadataMgr; - hoodieExtraMetadata = java.util.Optional.of(hoodieBasedMetadataManager.getMetadataInfo()); + hoodieExtraMetadata = Option.of(hoodieBasedMetadataManager.getMetadataInfo()); if (!shouldSaveChangesInFuture) { hoodieBasedMetadataManager.shouldSaveChanges().set(false); } @@ -328,9 +333,9 @@ private void logWriteMetrics(final Optional> writesStatuses final LongAccumulator totalCount = writesStatuses.get().rdd().sparkContext().longAccumulator(); final LongAccumulator errorCount = writesStatuses.get().rdd().sparkContext().longAccumulator(); writesStatuses.get().foreach(writeStatus -> { - errorCount.add(writeStatus.getFailedRecords().size()); - totalCount.add(writeStatus.getTotalRecords()); - }); + errorCount.add(writeStatus.getFailedRecords().size()); + totalCount.add(writeStatus.getTotalRecords()); + }); this.dataFeedMetrics.get().createLongMetric(DataFeedMetricNames.ERROR_ROWCOUNT, errorCount.value(), this.dataFeedMetricsTags); this.dataFeedMetrics.get().createLongMetric(DataFeedMetricNames.OUTPUT_ROWCOUNT, @@ -341,6 +346,7 @@ private void logWriteMetrics(final Optional> writesStatuses /** * {@link #updateSinkStat(Optional)} will compute {@link SinkStat} and persist changes into {@link IMetadataManager}. * As a part of {@link SinkStat} computation; it will compute avg record size for current run. + * * @param writesStatuses */ private void updateSinkStat(final Optional> writesStatuses) { @@ -349,16 +355,16 @@ private void updateSinkStat(final Optional> writesStatuses) final LongAccumulator fileCount = writesStatuses.get().rdd().sparkContext().longAccumulator(); final LongAccumulator totalSize = writesStatuses.get().rdd().sparkContext().longAccumulator(); writesStatuses.get().foreach( - writeStatus -> { - final long writeBytes = writeStatus.getStat().getTotalWriteBytes(); - final long numInserts = writeStatus.getStat().getNumWrites() - - writeStatus.getStat().getNumUpdateWrites(); - if (writeBytes > 0 && numInserts > 0) { - avgRecordSizeCounter.add(writeBytes / numInserts); + writeStatus -> { + final long writeBytes = writeStatus.getStat().getTotalWriteBytes(); + final long numInserts = writeStatus.getStat().getNumWrites() + - writeStatus.getStat().getNumUpdateWrites(); + if (writeBytes > 0 && numInserts > 0) { + avgRecordSizeCounter.add(writeBytes / numInserts); + } + fileCount.add(1); + totalSize.add(writeBytes); } - fileCount.add(1); - totalSize.add(writeBytes); - } ); final long avgRecordSize = (int) avgRecordSizeCounter.avg(); if (avgRecordSize > 0) { @@ -367,9 +373,9 @@ private void updateSinkStat(final Optional> writesStatuses) } if (this.dataFeedMetrics.isPresent()) { this.dataFeedMetrics.get().createLongMetric(DataFeedMetricNames.TOTAL_FILE_COUNT, fileCount.value(), - this.dataFeedMetricsTags); + this.dataFeedMetricsTags); this.dataFeedMetrics.get().createLongMetric(DataFeedMetricNames.TOTAL_WRITE_SIZE, totalSize.value(), - this.dataFeedMetricsTags); + this.dataFeedMetricsTags); } } this.sinkStatMgr.persist(); @@ -444,7 +450,7 @@ public HoodieWriteResult handleWrite( } private JavaRDD> dedupRecords(@NonNull final HoodieWriteClientWrapper writeClient, - @NonNull final JavaRDD> hoodieRecords) { + @NonNull final JavaRDD> hoodieRecords) { return writeClient.filterExists(hoodieRecords).persist(StorageLevel.DISK_ONLY()); } @@ -454,11 +460,11 @@ private JavaRDD> dedupRecords(@NonNull final H * see {@link UserDefinedBulkInsertPartitioner}. */ public static UserDefinedBulkInsertPartitioner getDataPartitioner(@NonNull final HoodieConfiguration hoodieConf, - @NonNull final Optional defaultDataPartitioner) { + @NonNull final Optional defaultDataPartitioner) { try { return (UserDefinedBulkInsertPartitioner) Class.forName(hoodieConf.getHoodieDataPartitioner( - defaultDataPartitioner.isPresent() ? defaultDataPartitioner.get() - : DefaultHoodieDataPartitioner.class.getName())).newInstance(); + defaultDataPartitioner.isPresent() ? defaultDataPartitioner.get() + : DefaultHoodieDataPartitioner.class.getName())).newInstance(); } catch (InstantiationException | IllegalAccessException | ClassNotFoundException | ClassCastException e) { throw new JobRuntimeException("exception in initializing data partitioner", e); } @@ -485,22 +491,22 @@ public void startCommitWithTime(@NotEmpty final String commitTime) { } public boolean commit(@NotEmpty final String commitTime, @NonNull final JavaRDD writeStatuses, - final java.util.Optional> extraMetadata) { + final Option> extraMetadata) { return this.hoodieWriteClient.commit(commitTime, writeStatuses, extraMetadata); } public JavaRDD insert(@NonNull final JavaRDD> records, - @NotEmpty final String commitTime) { + @NotEmpty final String commitTime) { return this.hoodieWriteClient.insert(records, commitTime); } public JavaRDD bulkInsert(@NonNull final JavaRDD> records, - @NotEmpty final String commitTime) { - return this.hoodieWriteClient.bulkInsert(records, commitTime, Option.apply(this.bulkInsertPartitioner)); + @NotEmpty final String commitTime) { + return this.hoodieWriteClient.bulkInsert(records, commitTime, Option.of(this.bulkInsertPartitioner)); } public JavaRDD upsert(@NonNull final JavaRDD> records, - @NotEmpty final String commitTime) { + @NotEmpty final String commitTime) { return this.hoodieWriteClient.upsert(records, commitTime); } @@ -522,7 +528,7 @@ public void close() { } public JavaRDD> filterExists( - final JavaRDD> hoodieRecords) { + final JavaRDD> hoodieRecords) { return this.hoodieWriteClient.filterExists(hoodieRecords); } } @@ -531,17 +537,29 @@ public JavaRDD> filterExists( * Supported hoodie write operations. */ public enum HoodieSinkOp { - /** {@link HoodieWriteClient#insert(JavaRDD, String)}*/ + /** + * {@link HoodieWriteClient#insert(JavaRDD, String)} + */ INSERT, - /** {@link HoodieWriteClient#bulkInsert(JavaRDD, String)}*/ + /** + * {@link HoodieWriteClient#bulkInsert(JavaRDD, String)} + */ BULK_INSERT, - /** {@link HoodieWriteClient#insert(JavaRDD, String)} {@link HoodieWriteClient#filterExists(JavaRDD)}*/ + /** + * {@link HoodieWriteClient#insert(JavaRDD, String)} {@link HoodieWriteClient#filterExists(JavaRDD)} + */ DEDUP_INSERT, - /** {@link HoodieWriteClient#bulkInsert(JavaRDD, String)} {@link HoodieWriteClient#filterExists(JavaRDD)}*/ + /** + * {@link HoodieWriteClient#bulkInsert(JavaRDD, String)} {@link HoodieWriteClient#filterExists(JavaRDD)} + */ DEDUP_BULK_INSERT, - /** {@link com.uber.hoodie.HoodieWriteClient#upsert(org.apache.spark.api.java.JavaRDD, java.lang.String)}*/ + /** + * {@link org.apache.hudi.HoodieWriteClient#upsert(org.apache.spark.api.java.JavaRDD, java.lang.String)} + */ UPSERT, - /** No operation */ + /** + * No operation + */ NO_OP } diff --git a/marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/HoodieWriteStatus.java b/marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/HoodieWriteStatus.java index eb54222..338eec5 100644 --- a/marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/HoodieWriteStatus.java +++ b/marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/HoodieWriteStatus.java @@ -16,8 +16,10 @@ */ package com.uber.marmaray.common.sinks.hoodie; -import com.uber.hoodie.WriteStatus; -import com.uber.hoodie.common.model.HoodieRecord; +import org.apache.hudi.WriteStatus; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; + import java.util.Map; import java.util.Optional; @@ -28,12 +30,16 @@ public class HoodieWriteStatus extends WriteStatus { private long totalRecords; + public HoodieWriteStatus(Boolean trackSuccessRecords, Double failureFraction) { + super(trackSuccessRecords, failureFraction); + } + /** - * Overriding {@link #markSuccess(HoodieRecord, Optional)} to avoid caching - * {@link com.uber.hoodie.common.model.HoodieKey} for successfully written hoodie records. + * Overriding {@link #markSuccess(HoodieRecord, Option)} to avoid caching + * {@link org.apache.hudi.common.model.HoodieKey} for successfully written hoodie records. */ @Override - public void markSuccess(final HoodieRecord record, final Optional> optionalRecordMetadata) { + public void markSuccess(final HoodieRecord record, final Option> optionalRecordMetadata) { this.totalRecords++; } diff --git a/marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/partitioner/DefaultHoodieDataPartitioner.java b/marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/partitioner/DefaultHoodieDataPartitioner.java index 4d1054f..5a01a4a 100644 --- a/marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/partitioner/DefaultHoodieDataPartitioner.java +++ b/marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/partitioner/DefaultHoodieDataPartitioner.java @@ -16,9 +16,9 @@ */ package com.uber.marmaray.common.sinks.hoodie.partitioner; -import com.uber.hoodie.common.model.HoodieRecord; -import com.uber.hoodie.common.model.HoodieRecordPayload; -import com.uber.hoodie.table.UserDefinedBulkInsertPartitioner; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.table.UserDefinedBulkInsertPartitioner; import lombok.NonNull; import org.apache.spark.api.java.JavaRDD; import scala.Serializable; diff --git a/marmaray/src/main/java/com/uber/marmaray/common/sources/file/JSONFileSourceDataConverter.java b/marmaray/src/main/java/com/uber/marmaray/common/sources/file/JSONFileSourceDataConverter.java index 5d0ae66..3fcad4b 100644 --- a/marmaray/src/main/java/com/uber/marmaray/common/sources/file/JSONFileSourceDataConverter.java +++ b/marmaray/src/main/java/com/uber/marmaray/common/sources/file/JSONFileSourceDataConverter.java @@ -17,7 +17,7 @@ package com.uber.marmaray.common.sources.file; -import com.uber.hoodie.avro.MercifulJsonConverter; +import org.apache.hudi.avro.MercifulJsonConverter; import com.uber.marmaray.common.AvroPayload; import com.uber.marmaray.common.configuration.Configuration; import com.uber.marmaray.common.converters.converterresult.ConverterResult; diff --git a/marmaray/src/main/java/com/uber/marmaray/common/sources/kafka/KafkaWorkUnitCalculator.java b/marmaray/src/main/java/com/uber/marmaray/common/sources/kafka/KafkaWorkUnitCalculator.java index 3b39dfe..901e6e0 100644 --- a/marmaray/src/main/java/com/uber/marmaray/common/sources/kafka/KafkaWorkUnitCalculator.java +++ b/marmaray/src/main/java/com/uber/marmaray/common/sources/kafka/KafkaWorkUnitCalculator.java @@ -68,7 +68,7 @@ @Slf4j @RequiredArgsConstructor public class KafkaWorkUnitCalculator implements IWorkUnitCalculator { + KafkaWorkUnitCalculatorResult, StringValue> { public static final String KAFKA_METADATA_PREFIX = "kafka_metadata"; public static final String KAFKA_METADATA_WITH_SEPARATOR = KAFKA_METADATA_PREFIX + StringTypes.COLON; @@ -116,27 +116,28 @@ public void initPreviousRunState(@NonNull final IMetadataManager me final String topicSpecificName = getTopicSpecificMetadataKey(topicName); final List toDelete = new LinkedList<>(); metadataManager.getAllKeys().forEach(key -> { - if (key.startsWith(topicSpecificName)) { - // this is my specific topic - metadata.put(Integer.parseInt(key.substring(topicSpecificName.length())), + if (key.startsWith(topicSpecificName)) { + // this is my specific topic + metadata.put(Integer.parseInt(key.substring(topicSpecificName.length())), Long.parseLong(metadataManager.get(key).get().getValue())); - } else if (key.startsWith(KAFKA_METADATA_WITH_SEPARATOR)) { - // this is a specific topic, but not mine. ignore. - assert true; - } else if (key.startsWith(KAFKA_METADATA_PREFIX)) { - // this is unspecified topic - metadata.put(Integer.parseInt(key.substring(KAFKA_METADATA_PREFIX.length())), + } else if (key.startsWith(KAFKA_METADATA_WITH_SEPARATOR)) { + // this is a specific topic, but not mine. ignore. + assert true; + } else if (key.startsWith(KAFKA_METADATA_PREFIX)) { + // this is unspecified topic + metadata.put(Integer.parseInt(key.substring(KAFKA_METADATA_PREFIX.length())), Long.parseLong(metadataManager.get(key).get().getValue())); - // delete the old, unspecified metadata - toDelete.add(key); - } - }); + // delete the old, unspecified metadata + toDelete.add(key); + } + }); toDelete.forEach(metadataManager::remove); this.previousRunState = Optional.of(new KafkaRunState(metadata)); } /** * Get the specific metadata name for kafka that we'll be using + * * @param topicName the name of the topic to get the metadata for * @return the processed name */ @@ -150,9 +151,9 @@ public void saveNextRunState(@NonNull final IMetadataManager metada final String topicName = this.conf.getTopicName(); final String topicSpecificName = getTopicSpecificMetadataKey(topicName); nextRunState.getPartitionOffsets().entrySet().forEach( - entry -> { - metadataManager.set(topicSpecificName + entry.getKey(), new StringValue(entry.getValue().toString())); - }); + entry -> { + metadataManager.set(topicSpecificName + entry.getKey(), new StringValue(entry.getValue().toString())); + }); } @Override @@ -174,15 +175,15 @@ public KafkaWorkUnitCalculatorResult computeWorkUnits() { // Read checkpointed topic partition offsets and update it with newly added partitions. final Map oldPartitionOffsets = readExistingPartitionOffsets(); - if (oldPartitionOffsets.isEmpty()) { - // If it's a first run then initialize new partitions with latest partition offsets. - return new KafkaWorkUnitCalculatorResult(Collections.emptyList(), - new KafkaRunState(this.offsetSelector.getPartitionOffsets( - this.conf, - latestLeaderOffsets.keySet(), - earliestLeaderOffsets, - latestLeaderOffsets))); - } +// if (oldPartitionOffsets.isEmpty()) { +// // If it's a first run then initialize new partitions with latest partition offsets. +// return new KafkaWorkUnitCalculatorResult(Collections.emptyList(), +// new KafkaRunState(this.offsetSelector.getPartitionOffsets( +// this.conf, +// latestLeaderOffsets.keySet(), +// earliestLeaderOffsets, +// latestLeaderOffsets))); +// } final Map newPartitionOffsets = updatePartitionStartOffsets(oldPartitionOffsets, earliestLeaderOffsets, latestLeaderOffsets); @@ -190,12 +191,12 @@ public KafkaWorkUnitCalculatorResult computeWorkUnits() { long totalNewMessages = 0; final List partitionMessages = new ArrayList<>(latestLeaderOffsets.size()); for (Entry entry : latestLeaderOffsets.entrySet()) { - if (!newPartitionOffsets.containsKey(entry.getKey().partition())) { - log.error("Unable to find offsets for topic {} partition {}", - entry.getKey().topic(), entry.getKey().partition()); - continue; - } - final long messages = entry.getValue() - newPartitionOffsets.get(entry.getKey().partition()); +// if (!newPartitionOffsets.containsKey(entry.getKey().partition())) { +// log.error("Unable to find offsets for topic {} partition {}", +// entry.getKey().topic(), entry.getKey().partition()); +// continue; +// } + final long messages = entry.getValue(); log.debug("topicPartition:{}:messages:{}:latestOffset:{}", entry.getKey(), messages, entry.getValue()); if (messages == 0) { continue; @@ -209,12 +210,12 @@ public KafkaWorkUnitCalculatorResult computeWorkUnits() { return new KafkaWorkUnitCalculatorResult(Collections.emptyList(), new KafkaRunState(newPartitionOffsets)); } final List workUnits = - calculatePartitionOffsetRangesToRead(partitionMessages, newPartitionOffsets, - totalNewMessages); + calculatePartitionOffsetRangesToRead(partitionMessages, newPartitionOffsets, + totalNewMessages); // compute run state for the next run. final KafkaRunState nextRunState = createNextRunState(workUnits); final KafkaWorkUnitCalculatorResult kafkaWorkUnitCalculatorResult = - new KafkaWorkUnitCalculatorResult(workUnits, nextRunState); + new KafkaWorkUnitCalculatorResult(workUnits, nextRunState); computeRunMetrics(latestLeaderOffsets, nextRunState, workUnits); log.info("workunits: {}", kafkaWorkUnitCalculatorResult); @@ -222,14 +223,14 @@ public KafkaWorkUnitCalculatorResult computeWorkUnits() { } private List calculatePartitionOffsetRangesToRead( - @NonNull final List partitionMessages, - @NonNull final Map partitionStartOffsets, final long numMessages) { + @NonNull final List partitionMessages, + @NonNull final Map partitionStartOffsets, final long numMessages) { // This will make sure that we can read more messages from partition with more than average messages per // partition at the same time we will read all the messages from partition with less than avg messags. Collections.sort(partitionMessages); final long maxMessagesToRead = this.conf.getMaxMessagesToRead(); log.info("topicName:{}:newMessages:{}:maxMessagesToRead:{}", this.conf.getTopicName(), numMessages, - maxMessagesToRead); + maxMessagesToRead); final boolean hasExtraMessages = numMessages > maxMessagesToRead; final long numMessagesToRead = Math.min(numMessages, maxMessagesToRead); @@ -247,8 +248,8 @@ private List calculatePartitionOffsetRangesToRead( } if (numMsgsToBeSelected > 0) { offsetRanges.add(OffsetRange.create(m.getTopicPartition(), - partitionStartOffsets.get(m.getTopicPartition().partition()), - partitionStartOffsets.get(m.getTopicPartition().partition()) + numMsgsToBeSelected)); + partitionStartOffsets.get(m.getTopicPartition().partition()), + partitionStartOffsets.get(m.getTopicPartition().partition()) + numMsgsToBeSelected)); } } return offsetRanges; @@ -267,21 +268,21 @@ private Map readExistingPartitionOffsets() { private KafkaRunState createNextRunState(@NonNull final List workUnits) { final Map partitionOffsets = new HashMap<>(); workUnits.forEach( - offsetRange -> { - final int partition = offsetRange.partition(); - if (partitionOffsets.containsKey(partition)) { - partitionOffsets - .put(partition, Math.max(partitionOffsets.get(partition), offsetRange.untilOffset())); - } else { - partitionOffsets.put(partition, offsetRange.untilOffset()); + offsetRange -> { + final int partition = offsetRange.partition(); + if (partitionOffsets.containsKey(partition)) { + partitionOffsets + .put(partition, Math.max(partitionOffsets.get(partition), offsetRange.untilOffset())); + } else { + partitionOffsets.put(partition, offsetRange.untilOffset()); + } } - } ); return new KafkaRunState(partitionOffsets); } - private Map buildResetPartitionOffsetMap(@NonNull final Map earliestTPOffsets, - @NonNull final Map latestTPOffsets) { + private Map buildResetPartitionOffsetMap(@NonNull final Map earliestTPOffsets, + @NonNull final Map latestTPOffsets) { Preconditions.checkState(kafkaOffsetResetter.isPresent(), "KafkaOffsetResetter should be present " + "when this method is called"); @@ -324,7 +325,7 @@ private Map handleDataLossAndMaybeResetOffsets(@NonNull final Map final long lossStartOffset, final long lossEndOffset) { final String errMsg = String.format("DATA_LOSS:MISSED_KAFKA_MESSAGES:topic:%s:partition:%d:" - + "startOffset:%d:endOffset:%d", topicPartition.topic(), + + "startOffset:%d:endOffset:%d", topicPartition.topic(), topicPartition.partition(), lossStartOffset, lossEndOffset); log.error(errMsg); if (kafkaOffsetResetter.isPresent()) { @@ -342,24 +343,23 @@ private Map handleDataLossAndMaybeResetOffsets(@NonNull final Map private Map updatePartitionStartOffsets(@NonNull final Map partitionOffsetMap, @NonNull final Map earliestTPOffsets, @NonNull final Map latestTPOffsets) { - if (!partitionOffsetMap.isEmpty()) { - for (Entry entry : earliestTPOffsets.entrySet()) { - final TopicPartition topicPartition = entry.getKey(); - if (!partitionOffsetMap.containsKey(topicPartition.partition())) { - // New partition is found. - log.info("Found new partition for topic:{}:partition:{}", topicPartition.topic(), - topicPartition.partition()); - partitionOffsetMap.put(topicPartition.partition(), entry.getValue()); - } else if (entry.getValue() > partitionOffsetMap.get(topicPartition.partition())) { - // data loss detected - return handleDataLossAndMaybeResetOffsets(earliestTPOffsets, latestTPOffsets, - topicPartition, partitionOffsetMap, - partitionOffsetMap.get(topicPartition.partition()), entry.getValue()); - } + for (Entry entry : earliestTPOffsets.entrySet()) { + final TopicPartition topicPartition = entry.getKey(); + if (!partitionOffsetMap.containsKey(topicPartition.partition())) { + // New partition is found. + log.info("Found new partition for topic:{}:partition:{}", topicPartition.topic(), + topicPartition.partition()); + partitionOffsetMap.put(topicPartition.partition(), entry.getValue()); + } else if (entry.getValue() > partitionOffsetMap.get(topicPartition.partition())) { + // data loss detected + return handleDataLossAndMaybeResetOffsets(earliestTPOffsets, latestTPOffsets, + topicPartition, partitionOffsetMap, + partitionOffsetMap.get(topicPartition.partition()), entry.getValue()); } } return partitionOffsetMap; } + /* Creates metrics for the current execution based on the source. */ @@ -376,22 +376,22 @@ private void computeRunMetrics(@NonNull final Map latestLe totalTags.put(PARTITION_TAG, TOTAL_PARTITION); final MessageCounters counter = new MessageCounters(); offsetRanges.forEach(offsetRange -> { - final Long oldCount = offsetMap.getOrDefault(offsetRange.topicPartition(), 0L); - offsetMap.put(offsetRange.topicPartition(), oldCount + offsetRange.count()); - }); + final Long oldCount = offsetMap.getOrDefault(offsetRange.topicPartition(), 0L); + offsetMap.put(offsetRange.topicPartition(), oldCount + offsetRange.count()); + }); latestLeaderOffsets.forEach( - (topicPartition, leaderOffset) -> - computePartitionMetrics( - topicPartition, leaderOffset, nextRunState, - topicMetrics, offsetMap.getOrDefault(topicPartition, 0L), counter) + (topicPartition, leaderOffset) -> + computePartitionMetrics( + topicPartition, leaderOffset, nextRunState, + topicMetrics, offsetMap.getOrDefault(topicPartition, 0L), counter) ); topicMetrics.createLongMetric(DataFeedMetricNames.ROWCOUNT_BEHIND, - counter.getTotalAvailable() - counter.getTotalCurrent(), totalTags); + counter.getTotalAvailable() - counter.getTotalCurrent(), totalTags); topicMetrics.createLongMetric(DataFeedMetricNames.INPUT_ROWCOUNT, counter.getTotalInput(), totalTags); if (this.chargebackCalculator.isPresent()) { this.chargebackCalculator.get().addCost( - this.topicMetrics.get().getBaseTags().get(DataFeedMetrics.DATA_FEED_NAME), + this.topicMetrics.get().getBaseTags().get(DataFeedMetrics.DATA_FEED_NAME), ChargebackMetricType.ROW_COUNT, counter.getTotalInput()); } } @@ -426,7 +426,7 @@ private void computePartitionMetrics(@NonNull final TopicPartition topicPartitio * It holds current set of work units and also {@link KafkaRunState} for the next run. */ public final class KafkaWorkUnitCalculatorResult implements - IWorkUnitCalculator.IWorkUnitCalculatorResult { + IWorkUnitCalculator.IWorkUnitCalculatorResult { @Getter private final KafkaRunState nextRunState; @@ -455,9 +455,9 @@ public String toString() { final StringBuilder sb = new StringBuilder(); sb.append("offsetRanges="); this.workUnits.forEach( - workUnit -> sb.append( - workUnit.partition()).append(":").append(workUnit.fromOffset()).append("->") - .append(workUnit.untilOffset()).append(";")); + workUnit -> sb.append( + workUnit.partition()).append(":").append(workUnit.fromOffset()).append("->") + .append(workUnit.untilOffset()).append(";")); return sb.toString(); } diff --git a/marmaray/src/main/java/com/uber/marmaray/common/spark/SparkFactory.java b/marmaray/src/main/java/com/uber/marmaray/common/spark/SparkFactory.java index 7e700c7..324e543 100644 --- a/marmaray/src/main/java/com/uber/marmaray/common/spark/SparkFactory.java +++ b/marmaray/src/main/java/com/uber/marmaray/common/spark/SparkFactory.java @@ -18,8 +18,8 @@ package com.uber.marmaray.common.spark; import com.google.common.base.Optional; -import com.uber.hoodie.common.model.HoodieKey; -import com.uber.hoodie.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; import com.uber.marmaray.common.AvroPayload; import com.uber.marmaray.common.configuration.CassandraSinkConfiguration; import com.uber.marmaray.common.configuration.Configuration; diff --git a/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java b/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java new file mode 100644 index 0000000..909a41a --- /dev/null +++ b/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java @@ -0,0 +1,297 @@ +package com.uber.marmaray.examples.job; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.google.common.base.Optional; +import com.uber.marmaray.common.configuration.*; +import com.uber.marmaray.common.converters.data.*; +import com.uber.marmaray.common.exceptions.JobRuntimeException; +import com.uber.marmaray.common.job.JobDag; +import com.uber.marmaray.common.job.JobManager; +import com.uber.marmaray.common.metadata.HoodieBasedMetadataManager; +import com.uber.marmaray.common.metadata.IMetadataManager; +import com.uber.marmaray.common.metrics.DataFeedMetricNames; +import com.uber.marmaray.common.metrics.DataFeedMetrics; +import com.uber.marmaray.common.metrics.ErrorCauseTagNames; +import com.uber.marmaray.common.metrics.JobMetricNames; +import com.uber.marmaray.common.metrics.JobMetrics; +import com.uber.marmaray.common.metrics.LongMetric; +import com.uber.marmaray.common.metrics.ModuleTagNames; +import com.uber.marmaray.common.metrics.TimerMetric; +import com.uber.marmaray.common.reporters.ConsoleReporter; +import com.uber.marmaray.common.reporters.Reporters; +import com.uber.marmaray.common.schema.kafka.KafkaSchemaJSONServiceReader; +import com.uber.marmaray.common.sinks.hoodie.HoodieSink; +import com.uber.marmaray.common.sources.ISource; +import com.uber.marmaray.common.sources.IWorkUnitCalculator; +import com.uber.marmaray.common.sources.kafka.KafkaSource; +import com.uber.marmaray.common.sources.kafka.KafkaWorkUnitCalculator; +import com.uber.marmaray.common.spark.SparkArgs; +import com.uber.marmaray.common.spark.SparkFactory; +import com.uber.marmaray.utilities.SparkUtil; +import com.uber.marmaray.utilities.ErrorExtractor; +import com.uber.marmaray.utilities.FSUtils; +import com.uber.marmaray.utilities.JobUtil; +import com.uber.marmaray.utilities.listener.TimeoutManager; +import lombok.Getter; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.Schema; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SQLContext; +import org.hibernate.validator.constraints.NotEmpty; +import parquet.Preconditions; + +import java.io.IOException; +import java.time.Instant; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.uber.marmaray.common.configuration.Configuration; +import com.uber.marmaray.common.converters.data.HoodieSinkDataConverter; + + +/** + * Job to load data from kafka to hoodie + */ +@Slf4j +public class KafkaToHoodieJob { + /** + * Generic entry point + * + * @param args arguments for the job, from the command line + * @throws IOException Exception + */ + public static void main(final String[] args) throws IOException { + new KafkaToHoodieJob().run(args); + } + + /** + * Main execution method for the job. + * + * @param args command line arguments + * @throws IOException Exception + */ + private void run(final String[] args) throws IOException { + + final Instant jobStartTime = Instant.now(); + + final Configuration conf = getConfiguration(args); + + final Reporters reporters = new Reporters(); + reporters.addReporter(new ConsoleReporter()); + + final Map metricTags = Collections.emptyMap(); + final DataFeedMetrics dataFeedMetrics = new DataFeedMetrics("KafkaToHoodieJob", metricTags); + + log.info("Initializing configurations for job"); + final TimerMetric confInitMetric = new TimerMetric(DataFeedMetricNames.INIT_CONFIG_LATENCY_MS, + metricTags); + + final KafkaSourceConfiguration kafkaSourceConf; + final HoodieConfiguration hoodieConf; + final HadoopConfiguration hadoopConf; + try { + hadoopConf = new HadoopConfiguration(conf); + kafkaSourceConf = new KafkaSourceConfiguration(conf); + hoodieConf = new HoodieConfiguration(conf, "test_hoodie"); + } catch (final Exception e) { + final LongMetric configError = new LongMetric(DataFeedMetricNames.DISPERSAL_CONFIGURATION_INIT_ERRORS, 1); + configError.addTags(metricTags); + configError.addTags(DataFeedMetricNames + .getErrorModuleCauseTags(ModuleTagNames.CONFIGURATION, ErrorCauseTagNames.CONFIG_ERROR)); + reporters.report(configError); + reporters.getReporters().forEach(dataFeedMetrics::gauageFailureMetric); + throw e; + } + confInitMetric.stop(); + reporters.report(confInitMetric); + + log.info("Reading schema"); + final TimerMetric convertSchemaLatencyMs = + new TimerMetric(DataFeedMetricNames.CONVERT_SCHEMA_LATENCY_MS, metricTags); + + final Schema outputSchema = new Schema.Parser().parse(hoodieConf.getHoodieWriteConfig().getSchema()); + convertSchemaLatencyMs.stop(); + reporters.report(convertSchemaLatencyMs); + + final SparkArgs sparkArgs = new SparkArgs( + Collections.singletonList(outputSchema), + SparkUtil.getSerializationClasses(), + conf); + final SparkFactory sparkFactory = new SparkFactory(sparkArgs); + final JobManager jobManager = JobManager.createJobManager(conf, "marmaray", + "frequency", sparkFactory, reporters); + + final JavaSparkContext jsc = sparkFactory.getSparkContext(); + + log.info("Initializing metadata manager for job"); + final TimerMetric metadataManagerInitMetric = + new TimerMetric(DataFeedMetricNames.INIT_METADATAMANAGER_LATENCY_MS, metricTags); + final IMetadataManager metadataManager; + try { + metadataManager = initMetadataManager(hadoopConf, hoodieConf, jsc); + } catch (final JobRuntimeException e) { + final LongMetric configError = new LongMetric(DataFeedMetricNames.DISPERSAL_CONFIGURATION_INIT_ERRORS, 1); + configError.addTags(metricTags); + configError.addTags(DataFeedMetricNames + .getErrorModuleCauseTags(ModuleTagNames.METADATA_MANAGER, ErrorCauseTagNames.CONFIG_ERROR)); + reporters.report(configError); + reporters.getReporters().forEach(dataFeedMetrics::gauageFailureMetric); + throw e; + } + metadataManagerInitMetric.stop(); + reporters.report(metadataManagerInitMetric); + + try { + log.info("Initializing converters & schemas for job"); + final SQLContext sqlContext = SQLContext.getOrCreate(jsc.sc()); + + log.info("Common schema is: {}", outputSchema.toString()); + + // Schema + log.info("Initializing source data converter"); + KafkaSchemaJSONServiceReader serviceReader = new KafkaSchemaJSONServiceReader(outputSchema); + final KafkaSourceDataConverter dataConverter = new KafkaSourceDataConverter(serviceReader, conf, + new ErrorExtractor()); + + log.info("Initializing source & sink for job"); + final ISource kafkaSource = new KafkaSource(kafkaSourceConf, Optional.of(jsc), dataConverter, + Optional.absent(), Optional.absent()); + + // Sink + HoodieSinkDataConverter hoodieSinkDataConverter = new HoodieSinkDataConverter(conf, new ErrorExtractor(), + hoodieConf); + HoodieSink hoodieSink = new HoodieSink(hoodieConf, hadoopConf, hoodieSinkDataConverter, jsc, metadataManager, + Optional.absent()); + + log.info("Initializing work unit calculator for job"); + final IWorkUnitCalculator workUnitCalculator = new KafkaWorkUnitCalculator(kafkaSourceConf); + + log.info("Initializing job dag"); + final JobDag jobDag = new JobDag(kafkaSource, hoodieSink, metadataManager, workUnitCalculator, + "test", "test", new JobMetrics("marmaray"), dataFeedMetrics, + reporters); + + jobManager.addJobDag(jobDag); + + log.info("Running ingestion job"); + try { + jobManager.run(); + JobUtil.raiseExceptionIfStatusFailed(jobManager.getJobManagerStatus()); + } catch (final Throwable t) { + if (TimeoutManager.getTimedOut()) { + final LongMetric runTimeError = new LongMetric(DataFeedMetricNames.MARMARAY_JOB_ERROR, 1); + runTimeError.addTags(metricTags); + runTimeError.addTags(DataFeedMetricNames.getErrorModuleCauseTags( + ModuleTagNames.JOB_MANAGER, ErrorCauseTagNames.TIME_OUT)); + reporters.report(runTimeError); + } + final LongMetric configError = new LongMetric(JobMetricNames.RUN_JOB_ERROR_COUNT, 1); + configError.addTags(metricTags); + reporters.report(configError); + throw t; + } + log.info("Ingestion job has been completed"); + + final TimerMetric jobLatencyMetric = + new TimerMetric(JobMetricNames.RUN_JOB_DAG_LATENCY_MS, metricTags, jobStartTime); + jobLatencyMetric.stop(); + reporters.report(jobLatencyMetric); + reporters.finish(); + } finally { + jsc.stop(); + } + } + + /** + * Get configuration from command line + * + * @param args command line arguments passed in + * @return configuration populated from them + */ + private Configuration getConfiguration(@NotEmpty final String[] args) { + final KafkaToHoodieCommandLineOptions options = new KafkaToHoodieCommandLineOptions(args); + if (options.getConfFile() != null) { + return getFileConfiguration(options.getConfFile()); + } else if (options.getJsonConf() != null) { + return getJsonConfiguration(options.getJsonConf()); + } else { + throw new JobRuntimeException("Unable to find conf; this shouldn't be possible"); + } + } + + /** + * Get configuration from JSON-based configuration + * + * @param jsonConf JSON string of configuration + * @return configuration populated from it + */ + private Configuration getJsonConfiguration(@NotEmpty final String jsonConf) { + final Configuration conf = new Configuration(); + conf.loadYamlStream(IOUtils.toInputStream(jsonConf), Optional.absent()); + return conf; + } + + /** + * Load configuration from a file on HDFS + * + * @param filePath path to the HDFS file to load + * @return configuration populated from it + */ + private Configuration getFileConfiguration(@NotEmpty final String filePath) { + final Configuration conf = new Configuration(); + try { + final FileSystem fs = FSUtils.getFs(conf, Optional.absent()); + final Path dataFeedConfFile = new Path(filePath); + log.info("Loading configuration from {}", dataFeedConfFile.toString()); + conf.loadYamlStream(fs.open(dataFeedConfFile), Optional.absent()); + } catch (IOException e) { + final String errorMessage = String.format("Unable to find configuration for %s", filePath); + log.error(errorMessage); + throw new JobRuntimeException(errorMessage, e); + } + return conf; + + } + + /** + * Initialize the metadata store system + * + * @param conf configuration to use + * @param jsc Java spark context + * @return metadata manager + */ + private static IMetadataManager initMetadataManager(@NonNull final HadoopConfiguration hadoopConf, + @NonNull final HoodieConfiguration conf, + @NonNull final JavaSparkContext jsc) { + log.info("Create metadata manager"); + try { + return new HoodieBasedMetadataManager(conf, hadoopConf, new AtomicBoolean(true), jsc); + } catch (IOException e) { + throw new JobRuntimeException("Unable to create metadata manager", e); + } + } + + private static final class KafkaToHoodieCommandLineOptions { + @Getter + @Parameter(names = {"--configurationFile", "-c"}, description = "path to configuration file") + private String confFile; + + @Getter + @Parameter(names = {"--jsonConfiguration", "-j"}, description = "json configuration") + private String jsonConf; + + private KafkaToHoodieCommandLineOptions(@NonNull final String[] args) { + final JCommander commander = new JCommander(this); + commander.parse(args); + Preconditions.checkState(this.confFile != null || this.jsonConf != null, + "One of jsonConfiguration or configurationFile must be specified"); + } + } + +} diff --git a/marmaray/src/main/java/com/uber/marmaray/utilities/ErrorTableUtil.java b/marmaray/src/main/java/com/uber/marmaray/utilities/ErrorTableUtil.java index 0a0446f..860cd49 100644 --- a/marmaray/src/main/java/com/uber/marmaray/utilities/ErrorTableUtil.java +++ b/marmaray/src/main/java/com/uber/marmaray/utilities/ErrorTableUtil.java @@ -16,12 +16,13 @@ */ package com.uber.marmaray.utilities; -import com.uber.hoodie.common.model.HoodieKey; -import com.uber.hoodie.common.model.HoodieRecord; -import com.uber.hoodie.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; import com.uber.marmaray.common.HoodieErrorPayload; import com.uber.marmaray.common.configuration.Configuration; import com.uber.marmaray.common.configuration.ErrorTableConfiguration; +import com.uber.marmaray.common.configuration.HadoopConfiguration; import com.uber.marmaray.common.configuration.HoodieConfiguration; import com.uber.marmaray.common.converters.data.DummyHoodieSinkDataConverter; import com.uber.marmaray.common.data.ErrorData; @@ -98,6 +99,7 @@ public static void writeErrorRecordsToErrorTable(@NonNull final SparkContext sc, JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc); final ErrorTableConfiguration errorTableConf = new ErrorTableConfiguration(conf); + final HadoopConfiguration hadoopConf = new HadoopConfiguration(conf); if (!errorTableConf.isEnabled()) { return; } @@ -118,10 +120,9 @@ public static void writeErrorRecordsToErrorTable(@NonNull final SparkContext sc, try { final HoodieBasedMetadataManager metadataManager = - new HoodieBasedMetadataManager(hoodieConf, shouldSaveChanges, jsc); - final HoodieSink hoodieSink = new HoodieErrorSink(hoodieConf, new DummyHoodieSinkDataConverter(), jsc, - HoodieSink.HoodieSinkOp.BULK_INSERT, metadataManager, - false); + new HoodieBasedMetadataManager(hoodieConf, hadoopConf, shouldSaveChanges, jsc); + final HoodieSink hoodieSink = new HoodieErrorSink(hoodieConf, hadoopConf, new DummyHoodieSinkDataConverter(), + jsc, metadataManager,false); JavaRDD errorRecords = errorData.getData().map(error -> generateGenericErrorRecord( errorExtractor, errorTableSchema, error, applicationId)); @@ -160,7 +161,8 @@ public static void initErrorTableDataset(@NonNull final Configuration conf, @Not .withTableName(errorTableName) .enableMetrics(false) .build(); - HoodieUtil.initHoodieDataset(FSUtils.getFs(conf, Optional.of(hoodieConf.getBasePath())), hoodieConf); + final HadoopConfiguration hadopConf = new HadoopConfiguration(conf); + HoodieUtil.initHoodieDataset(FSUtils.getFs(conf, Optional.of(hoodieConf.getBasePath())), hadopConf, hoodieConf); } public static void addErrorSchemaConfiguration( diff --git a/marmaray/src/main/java/com/uber/marmaray/utilities/HoodieSinkErrorExtractor.java b/marmaray/src/main/java/com/uber/marmaray/utilities/HoodieSinkErrorExtractor.java index 9ee508c..a74e29c 100644 --- a/marmaray/src/main/java/com/uber/marmaray/utilities/HoodieSinkErrorExtractor.java +++ b/marmaray/src/main/java/com/uber/marmaray/utilities/HoodieSinkErrorExtractor.java @@ -16,8 +16,8 @@ */ package com.uber.marmaray.utilities; -import com.uber.hoodie.common.model.HoodieRecord; -import com.uber.hoodie.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; import com.uber.marmaray.common.data.ErrorData; import com.uber.marmaray.common.data.RawData; diff --git a/marmaray/src/main/java/com/uber/marmaray/utilities/HoodieUtil.java b/marmaray/src/main/java/com/uber/marmaray/utilities/HoodieUtil.java index 870767f..1dec317 100644 --- a/marmaray/src/main/java/com/uber/marmaray/utilities/HoodieUtil.java +++ b/marmaray/src/main/java/com/uber/marmaray/utilities/HoodieUtil.java @@ -17,10 +17,11 @@ package com.uber.marmaray.utilities; import com.google.common.base.Optional; -import com.uber.hoodie.common.model.HoodieRecord; -import com.uber.hoodie.common.model.HoodieRecordPayload; -import com.uber.hoodie.common.table.HoodieTableConfig; -import com.uber.hoodie.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import com.uber.marmaray.common.configuration.HadoopConfiguration; import com.uber.marmaray.common.configuration.HoodieConfiguration; import com.uber.marmaray.common.exceptions.JobRuntimeException; import lombok.NonNull; @@ -44,18 +45,18 @@ private HoodieUtil() { /** * It initializes hoodie dataset * @param fs {@link FileSystem} + * @param hadoopConf {@link HadoopConfiguration} * @param hoodieConf {@link HoodieConfiguration} * @throws IOException */ - public static void initHoodieDataset(@NonNull final FileSystem fs, + public static void initHoodieDataset(@NonNull final FileSystem fs, @NonNull final HadoopConfiguration hadoopConf, @NonNull final HoodieConfiguration hoodieConf) throws IOException { final Path hoodieMetaFolder = new Path(hoodieConf.getBasePath(), HoodieTableMetaClient.METAFOLDER_NAME); final Path hoodiePropertiesFile = new Path(hoodieMetaFolder.toString(), HoodieTableConfig.HOODIE_PROPERTIES_FILE); if (!fs.exists(hoodiePropertiesFile)) { HoodieTableMetaClient - .initializePathAsHoodieDataset(FSUtils.getFs(hoodieConf.getConf(), - Optional.of(hoodieConf.getBasePath())), + .initDatasetAndGetMetaClient(hadoopConf.getHadoopConf(), hoodieConf.getBasePath(), hoodieConf.getHoodieInitProperties()); } } diff --git a/marmaray/src/main/java/com/uber/marmaray/utilities/SparkUtil.java b/marmaray/src/main/java/com/uber/marmaray/utilities/SparkUtil.java index 9a60b61..a100fb4 100644 --- a/marmaray/src/main/java/com/uber/marmaray/utilities/SparkUtil.java +++ b/marmaray/src/main/java/com/uber/marmaray/utilities/SparkUtil.java @@ -18,10 +18,10 @@ import com.google.common.base.Optional; import com.google.common.collect.Sets; -import com.uber.hoodie.WriteStatus; -import com.uber.hoodie.common.HoodieRollbackStat; -import com.uber.hoodie.common.model.HoodieRecordLocation; -import com.uber.hoodie.common.model.HoodieWriteStat; +import org.apache.hudi.WriteStatus; +import org.apache.hudi.common.HoodieRollbackStat; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieWriteStat; import com.uber.marmaray.common.HoodieErrorPayload; import com.uber.marmaray.common.exceptions.JobRuntimeException; import lombok.NonNull; diff --git a/marmaray/src/test/java/com/uber/marmaray/common/configuration/TestHoodieConfiguration.java b/marmaray/src/test/java/com/uber/marmaray/common/configuration/TestHoodieConfiguration.java index f83bd74..a445243 100644 --- a/marmaray/src/test/java/com/uber/marmaray/common/configuration/TestHoodieConfiguration.java +++ b/marmaray/src/test/java/com/uber/marmaray/common/configuration/TestHoodieConfiguration.java @@ -16,7 +16,7 @@ */ package com.uber.marmaray.common.configuration; -import com.uber.hoodie.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodieAvroPayload; import com.uber.marmaray.common.AvroPayload; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; diff --git a/marmaray/src/test/java/com/uber/marmaray/common/configuration/TestHoodieIndexConfiguration.java b/marmaray/src/test/java/com/uber/marmaray/common/configuration/TestHoodieIndexConfiguration.java index ceb2a6a..d77332a 100644 --- a/marmaray/src/test/java/com/uber/marmaray/common/configuration/TestHoodieIndexConfiguration.java +++ b/marmaray/src/test/java/com/uber/marmaray/common/configuration/TestHoodieIndexConfiguration.java @@ -18,7 +18,7 @@ package com.uber.marmaray.common.configuration; import com.google.common.base.Optional; -import com.uber.hoodie.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieIndexConfig; import lombok.NonNull; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; diff --git a/marmaray/src/test/java/com/uber/marmaray/common/metadata/TestHoodieBasedMetadataManager.java b/marmaray/src/test/java/com/uber/marmaray/common/metadata/TestHoodieBasedMetadataManager.java index b1be2e4..ee10bf1 100644 --- a/marmaray/src/test/java/com/uber/marmaray/common/metadata/TestHoodieBasedMetadataManager.java +++ b/marmaray/src/test/java/com/uber/marmaray/common/metadata/TestHoodieBasedMetadataManager.java @@ -16,7 +16,7 @@ */ package com.uber.marmaray.common.metadata; -import com.uber.hoodie.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.HoodieTableMetaClient; import com.uber.marmaray.common.configuration.Configuration; import com.uber.marmaray.common.configuration.HadoopConfiguration; import com.uber.marmaray.common.configuration.HoodieConfiguration; @@ -56,8 +56,9 @@ public void testHoodieMetadataManager() throws IOException { .withBasePath(basePath.toString()).withSchema(schemaStr).withMetricsPrefix("hoodieMetricsPrefix") .enableMetrics(false).build(); final AtomicBoolean condition = new AtomicBoolean(true); + final HadoopConfiguration hadoopConf = new HadoopConfiguration(new Configuration()); final HoodieBasedMetadataManager mgr = - new HoodieBasedMetadataManager(hoodieConf, condition, this.jsc.get()); + new HoodieBasedMetadataManager(hoodieConf, hadoopConf, condition, this.jsc.get()); // When no previous metadata is present then metadata map is expected to be empty. Assert.assertEquals(0, mgr.getAll().size()); @@ -93,7 +94,7 @@ public void testHoodieMetadataManager() throws IOException { // Now let's create another final AtomicBoolean condition2 = new AtomicBoolean(true); final HoodieBasedMetadataManager mgr2 = - new HoodieBasedMetadataManager(hoodieConf, condition2, this.jsc.get()); + new HoodieBasedMetadataManager(hoodieConf, hadoopConf, condition2, this.jsc.get()); Assert.assertEquals(1, mgr2.getAll().size()); Assert.assertFalse(mgr2.getMetadataInfo().get(HoodieBasedMetadataManager.HOODIE_METADATA_KEY).isEmpty()); Assert.assertEquals(mgr.getMetadataInfo(), mgr2.getMetadataInfo()); @@ -110,7 +111,9 @@ public void testRemove() throws Exception { .withBasePath(basePath.toString()).withSchema(schemaStr).withMetricsPrefix("hoodieMetricsPrefix") .enableMetrics(false).build(); final AtomicBoolean condition = new AtomicBoolean(true); - final HoodieBasedMetadataManager mgr = new HoodieBasedMetadataManager(hoodieConf, condition, this.jsc.get()); + final HadoopConfiguration hadoopConf = new HadoopConfiguration(new Configuration()); + final HoodieBasedMetadataManager mgr = new HoodieBasedMetadataManager(hoodieConf, hadoopConf, condition, + this.jsc.get()); // set up default final String testKey = "partition1"; @@ -119,13 +122,15 @@ public void testRemove() throws Exception { mgr.saveChanges(); // mgr2 loads correctly - final HoodieBasedMetadataManager mgr2 = new HoodieBasedMetadataManager(hoodieConf, condition, this.jsc.get()); + final HoodieBasedMetadataManager mgr2 = new HoodieBasedMetadataManager(hoodieConf, hadoopConf, condition, + this.jsc.get()); Assert.assertEquals(testValue, mgr2.get(testKey).get().getValue()); mgr2.remove(testKey); Assert.assertFalse(mgr2.get(testKey).isPresent()); // mgr2 hasn't saved yet, so should still get old value - final HoodieBasedMetadataManager mgr3 = new HoodieBasedMetadataManager(hoodieConf, condition, this.jsc.get()); + final HoodieBasedMetadataManager mgr3 = new HoodieBasedMetadataManager(hoodieConf, hadoopConf, condition, + this.jsc.get()); Assert.assertEquals(testValue, mgr3.get(testKey).get().getValue()); // save remove @@ -133,7 +138,8 @@ public void testRemove() throws Exception { mgr2.saveChanges(); // new load shouldn't find it anymore - final HoodieBasedMetadataManager mgr4 = new HoodieBasedMetadataManager(hoodieConf, condition, this.jsc.get()); + final HoodieBasedMetadataManager mgr4 = new HoodieBasedMetadataManager(hoodieConf, hadoopConf, condition, + this.jsc.get()); Assert.assertFalse(mgr4.get(testKey).isPresent()); } diff --git a/marmaray/src/test/java/com/uber/marmaray/common/sinks/hoodie/TestHoodieSink.java b/marmaray/src/test/java/com/uber/marmaray/common/sinks/hoodie/TestHoodieSink.java index 3a0076d..c26980d 100644 --- a/marmaray/src/test/java/com/uber/marmaray/common/sinks/hoodie/TestHoodieSink.java +++ b/marmaray/src/test/java/com/uber/marmaray/common/sinks/hoodie/TestHoodieSink.java @@ -18,10 +18,12 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; -import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; -import com.uber.hoodie.config.HoodieWriteConfig; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; import com.uber.marmaray.common.AvroPayload; import com.uber.marmaray.common.configuration.Configuration; +import com.uber.marmaray.common.configuration.HadoopConfiguration; import com.uber.marmaray.common.configuration.HoodieConfiguration; import com.uber.marmaray.common.converters.data.HoodieSinkDataConverter; import com.uber.marmaray.common.converters.data.TSBasedHoodieSinkDataConverter; @@ -86,21 +88,23 @@ class MockHoodieSink extends HoodieSink { private HoodieWriteClientWrapper mockWriteClient; public MockHoodieSink(@NonNull final HoodieConfiguration hoodieConf, - @NonNull final HoodieSinkDataConverter hoodieKeyGenerator, @NonNull final JavaSparkContext jsc, - @NonNull final HoodieSinkOp op) { - super(hoodieConf, hoodieKeyGenerator, jsc, op, new MemoryMetadataManager(), Optional.absent()); + @NonNull final HadoopConfiguration hadoopConf, + @NonNull final HoodieSinkDataConverter hoodieKeyGenerator, + @NonNull final JavaSparkContext jsc) { + super(hoodieConf, hadoopConf, hoodieKeyGenerator, jsc, new MemoryMetadataManager(), Optional.absent()); } public MockHoodieSink(@NonNull final HoodieConfiguration hoodieConf, - @NonNull final HoodieSinkDataConverter hoodieKeyGenerator, @NonNull final JavaSparkContext jsc, - @NonNull final HoodieSinkOp op, - @NonNull final IMetadataManager metadataMgr) { - super(hoodieConf, hoodieKeyGenerator, jsc, op, metadataMgr, Optional.absent()); + @NonNull final HadoopConfiguration hadoopConf, + @NonNull final HoodieSinkDataConverter hoodieKeyGenerator, + @NonNull final JavaSparkContext jsc, + @NonNull final IMetadataManager metadataMgr) { + super(hoodieConf, hadoopConf, hoodieKeyGenerator, jsc, metadataMgr, Optional.absent()); } @Override protected HoodieWriteClientWrapper getHoodieWriteClient( - @NonNull final HoodieWriteConfig hoodieWriteConfig) { + @NonNull final HoodieWriteConfig hoodieWriteConfig) { this.mockWriteClient = spy(super.getHoodieWriteClient(hoodieWriteConfig)); return this.mockWriteClient; } @@ -121,12 +125,12 @@ public void testUpdateInsertParallelism() { final String tableName = "test-table"; final String schemaStr = getSchema("TS", "RECORD_KEY", 4, 8).toString(); final HoodieConfiguration hoodieConf = - HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") - .withBasePath(basePath).withSchema(schemaStr).enableMetrics(false).build(); + HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") + .withBasePath(basePath).withSchema(schemaStr).withSinkOp("NO_OP").enableMetrics(false).build(); + final HadoopConfiguration hadoopConf = new HadoopConfiguration(new Configuration()); final HoodieSink mockSink = - spy(new HoodieSink(hoodieConf, mock(HoodieSinkDataConverter.class), - mock(JavaSparkContext.class), HoodieSink.HoodieSinkOp.NO_OP, new NoOpMetadataManager(), - Optional.absent())); + spy(new HoodieSink(hoodieConf, hadoopConf, mock(HoodieSinkDataConverter.class), mock(JavaSparkContext.class), + new NoOpMetadataManager(), Optional.absent())); when(mockSink.calculateNewBulkInsertParallelism(anyLong())).thenReturn(18); Assert.assertTrue(mockSink.updateInsertParallelism(1000)); Assert.assertEquals(18, hoodieConf.getInsertParallelism()); @@ -139,12 +143,12 @@ public void testUpdateBulkInsertParallelism() { final String tableName = "test-table"; final String schemaStr = getSchema("TS", "RECORD_KEY", 4, 8).toString(); final HoodieConfiguration hoodieConf = - HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") - .withBasePath(basePath).withSchema(schemaStr).enableMetrics(false).build(); + HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") + .withBasePath(basePath).withSchema(schemaStr).withSinkOp("NO_OP").enableMetrics(false).build(); + final HadoopConfiguration hadoopConf = new HadoopConfiguration(new Configuration()); final HoodieSink mockSink = - spy(new HoodieSink(hoodieConf, mock(HoodieSinkDataConverter.class), - mock(JavaSparkContext.class), HoodieSink.HoodieSinkOp.NO_OP, new NoOpMetadataManager(), - Optional.absent())); + spy(new HoodieSink(hoodieConf, hadoopConf, mock(HoodieSinkDataConverter.class), mock(JavaSparkContext.class), + new NoOpMetadataManager(), Optional.absent())); when(mockSink.calculateNewBulkInsertParallelism(anyLong())).thenReturn(18); Assert.assertTrue(mockSink.updateBulkInsertParallelism(1000)); Assert.assertEquals(18, hoodieConf.getBulkInsertParallelism()); @@ -156,14 +160,17 @@ public void testHoodieSinkWriteInsertWithoutMetadata() throws IOException { final String basePath = FileTestUtil.getTempFolder(); final String tableName = "test-table"; final String schemaStr = getSchema(TS_KEY, RECORD_KEY, 4, 8).toString(); - final HoodieSinkDataConverter hoodieKeyGenerator = - new TSBasedHoodieSinkDataConverter(conf, RECORD_KEY, TS_KEY, TimeUnit.MILLISECONDS); final HoodieConfiguration hoodieConf = - HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") - .withBasePath(basePath).withSchema(schemaStr).enableMetrics(false).build(); - final MockHoodieSink hoodieSink = new MockHoodieSink(hoodieConf, hoodieKeyGenerator, jsc.get(), INSERT); + HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") + .withBasePath(basePath).withSchema(schemaStr).withRecordKey(RECORD_KEY) + .withPartitionPath(TS_KEY).withSinkOp("INSERT").enableMetrics(false).build(); + final HoodieSinkDataConverter hoodieKeyGenerator = + new TSBasedHoodieSinkDataConverter(conf, hoodieConf, TimeUnit.MILLISECONDS); + + final HadoopConfiguration hadoopConf = new HadoopConfiguration(new Configuration()); + final MockHoodieSink hoodieSink = new MockHoodieSink(hoodieConf, hadoopConf, hoodieKeyGenerator, jsc.get()); final JavaRDD inputRDD = - this.jsc.get().parallelize(getRandomData(schemaStr, TS_KEY, RECORD_KEY, 10)); + this.jsc.get().parallelize(getRandomData(schemaStr, TS_KEY, RECORD_KEY, 10)); final Map emptyTags = new HashMap<>(); final DataFeedMetrics dfm = new DataFeedMetrics(JOB_NAME, emptyTags); hoodieSink.setDataFeedMetrics(dfm); @@ -173,7 +180,7 @@ public void testHoodieSinkWriteInsertWithoutMetadata() throws IOException { // It should generate exactly one commit file. Assert.assertEquals(1, getCommitFiles(basePath, FSUtils.getFs(new Configuration(), - Optional.of(basePath))).size()); + Optional.of(basePath))).size()); /* Expected function calls. 1) startCommit (once). @@ -185,15 +192,15 @@ public void testHoodieSinkWriteInsertWithoutMetadata() throws IOException { */ Mockito.verify(hoodieWriteClientWrapper, Mockito.times(1)).startCommit(); Mockito.verify(hoodieWriteClientWrapper, Mockito.times(0)) - .upsert(Matchers.any(JavaRDD.class), Matchers.anyString()); + .upsert(Matchers.any(JavaRDD.class), Matchers.anyString()); Mockito.verify(hoodieWriteClientWrapper, Mockito.times(1)) - .insert(Matchers.any(JavaRDD.class), Matchers.anyString()); + .insert(Matchers.any(JavaRDD.class), Matchers.anyString()); Mockito.verify(hoodieWriteClientWrapper, Mockito.times(1)) - .commit(Matchers.anyString(), Matchers.any(JavaRDD.class), - Matchers.same(java.util.Optional.empty())); + .commit(Matchers.anyString(), Matchers.any(JavaRDD.class), + Matchers.same(Option.empty())); Mockito.verify(hoodieWriteClientWrapper, Mockito.times(1)).close(); Mockito.verify(hoodieWriteClientWrapper, Mockito.times(0)) - .bulkInsert(Matchers.any(JavaRDD.class), Matchers.anyString()); + .bulkInsert(Matchers.any(JavaRDD.class), Matchers.anyString()); } @Test @@ -201,14 +208,17 @@ public void testHoodieSinkWriteUpsertWithoutMetadata() throws IOException { final String basePath = FileTestUtil.getTempFolder(); final String tableName = "test-table"; final String schemaStr = getSchema(TS_KEY, RECORD_KEY, 4, 8).toString(); - final HoodieSinkDataConverter hoodieKeyGenerator = - new TSBasedHoodieSinkDataConverter(this.conf, RECORD_KEY, TS_KEY, TimeUnit.MILLISECONDS); final HoodieConfiguration hoodieConf = - HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") - .withBasePath(basePath).withSchema(schemaStr).enableMetrics(false).build(); - final MockHoodieSink hoodieSink = new MockHoodieSink(hoodieConf, hoodieKeyGenerator, jsc.get(), UPSERT); + HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") + .withBasePath(basePath).withSchema(schemaStr).withRecordKey(RECORD_KEY) + .withPartitionPath(TS_KEY).withSinkOp("UPSERT").enableMetrics(false).build(); + final HoodieSinkDataConverter hoodieKeyGenerator = + new TSBasedHoodieSinkDataConverter(conf, hoodieConf, TimeUnit.MILLISECONDS); + + final HadoopConfiguration hadoopConf = new HadoopConfiguration(new Configuration()); + final MockHoodieSink hoodieSink = new MockHoodieSink(hoodieConf, hadoopConf, hoodieKeyGenerator, jsc.get()); final JavaRDD inputRDD = - this.jsc.get().parallelize(getRandomData(schemaStr, TS_KEY, RECORD_KEY, 10)); + this.jsc.get().parallelize(getRandomData(schemaStr, TS_KEY, RECORD_KEY, 10)); final Map emptyTags = new HashMap<>(); final DataFeedMetrics dfm = new DataFeedMetrics(JOB_NAME, emptyTags); hoodieSink.setDataFeedMetrics(dfm); @@ -217,7 +227,7 @@ public void testHoodieSinkWriteUpsertWithoutMetadata() throws IOException { final HoodieWriteClientWrapper hoodieWriteClientWrapper = hoodieSink.getMockWriteClient(); // It should generate exactly one commit file. Assert.assertEquals(1, getCommitFiles(basePath, FSUtils.getFs(new Configuration(), - Optional.of(basePath))).size()); + Optional.of(basePath))).size()); /* Expected function calls. 1) startCommit (once). @@ -230,12 +240,12 @@ public void testHoodieSinkWriteUpsertWithoutMetadata() throws IOException { Mockito.verify(hoodieWriteClientWrapper, Mockito.times(1)).startCommit(); Mockito.verify(hoodieWriteClientWrapper, Mockito.times(1)) - .upsert(Matchers.any(JavaRDD.class), Matchers.anyString()); + .upsert(Matchers.any(JavaRDD.class), Matchers.anyString()); Mockito.verify(hoodieWriteClientWrapper, Mockito.times(0)) - .bulkInsert(Matchers.any(JavaRDD.class), Matchers.anyString()); + .bulkInsert(Matchers.any(JavaRDD.class), Matchers.anyString()); Mockito.verify(hoodieWriteClientWrapper, Mockito.times(1)) - .commit(Matchers.anyString(), Matchers.any(JavaRDD.class), - Matchers.same(java.util.Optional.empty())); + .commit(Matchers.anyString(), Matchers.any(JavaRDD.class), + Matchers.same(Option.empty())); } @Test @@ -243,18 +253,21 @@ public void testHoodieSinkWriteInsertWithMetadata() throws IOException { final String basePath = FileTestUtil.getTempFolder(); final String tableName = "test-table"; final String schemaStr = getSchema(TS_KEY, RECORD_KEY, 4, 8).toString(); - final HoodieSinkDataConverter hoodieKeyGenerator = - new TSBasedHoodieSinkDataConverter(this.conf, RECORD_KEY, TS_KEY, TimeUnit.MILLISECONDS); final HoodieConfiguration hoodieConf = - HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") - .withBasePath(basePath).withSchema(schemaStr).enableMetrics(false).build(); + HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") + .withBasePath(basePath).withSchema(schemaStr).withRecordKey(RECORD_KEY) + .withPartitionPath(TS_KEY).withSinkOp("INSERT").enableMetrics(false).build(); + final HoodieSinkDataConverter hoodieKeyGenerator = + new TSBasedHoodieSinkDataConverter(conf, hoodieConf, TimeUnit.MILLISECONDS); + + final HadoopConfiguration hadoopConf = new HadoopConfiguration(new Configuration()); final HoodieBasedMetadataManager hoodieBasedMetadataManager = new HoodieBasedMetadataManager(hoodieConf, - new AtomicBoolean(true), this.jsc.get()); + hadoopConf, new AtomicBoolean(true), this.jsc.get()); hoodieBasedMetadataManager.set("randomKey", new StringValue("randomValue")); - final MockHoodieSink hoodieSink = new MockHoodieSink(hoodieConf, hoodieKeyGenerator, jsc.get(), INSERT, - hoodieBasedMetadataManager); + final MockHoodieSink hoodieSink = new MockHoodieSink(hoodieConf, hadoopConf, hoodieKeyGenerator, jsc.get(), + hoodieBasedMetadataManager); final JavaRDD inputRDD = - this.jsc.get().parallelize(getRandomData(schemaStr, TS_KEY, RECORD_KEY, 10)); + this.jsc.get().parallelize(getRandomData(schemaStr, TS_KEY, RECORD_KEY, 10)); final Map emptyTags = new HashMap<>(); final DataFeedMetrics dfm = new DataFeedMetrics(JOB_NAME, emptyTags); hoodieSink.setDataFeedMetrics(dfm); @@ -264,7 +277,7 @@ public void testHoodieSinkWriteInsertWithMetadata() throws IOException { // It should generate exactly one commit file. Assert.assertEquals(1, getCommitFiles(basePath, FSUtils.getFs(new Configuration(), - Optional.of(basePath))).size()); + Optional.of(basePath))).size()); /* Expected function calls. 1) startCommit (once). @@ -276,12 +289,12 @@ public void testHoodieSinkWriteInsertWithMetadata() throws IOException { */ Mockito.verify(hoodieWriteClientWrapper, Mockito.times(1)).startCommit(); Mockito.verify(hoodieWriteClientWrapper, Mockito.times(0)) - .upsert(Matchers.any(JavaRDD.class), Matchers.anyString()); + .upsert(Matchers.any(JavaRDD.class), Matchers.anyString()); Mockito.verify(hoodieWriteClientWrapper, Mockito.times(1)) - .insert(Matchers.any(JavaRDD.class), Matchers.anyString()); + .insert(Matchers.any(JavaRDD.class), Matchers.anyString()); Mockito.verify(hoodieWriteClientWrapper, Mockito.times(1)) - .commit(Matchers.anyString(), Matchers.any(JavaRDD.class), - Matchers.eq(java.util.Optional.of(hoodieBasedMetadataManager.getMetadataInfo()))); + .commit(Matchers.anyString(), Matchers.any(JavaRDD.class), + Matchers.eq(Option.of(hoodieBasedMetadataManager.getMetadataInfo()))); Mockito.verify(hoodieWriteClientWrapper, Mockito.times(1)).close(); Assert.assertFalse(hoodieBasedMetadataManager.shouldSaveChanges().get()); } @@ -291,19 +304,21 @@ public void testHoodieSinkWriteUpsertWithMetadata() throws IOException { final String basePath = FileTestUtil.getTempFolder(); final String tableName = "test-table"; final String schemaStr = getSchema(TS_KEY, RECORD_KEY, 4, 8).toString(); - final HoodieSinkDataConverter hoodieKeyGenerator = - new TSBasedHoodieSinkDataConverter(this.conf, RECORD_KEY, TS_KEY, TimeUnit.MILLISECONDS); final HoodieConfiguration hoodieConf = - HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") - .withBasePath(basePath).withSchema(schemaStr).enableMetrics(false).build(); + HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") + .withBasePath(basePath).withSchema(schemaStr).withRecordKey(RECORD_KEY) + .withPartitionPath(TS_KEY).withSinkOp("UPSERT").enableMetrics(false).build(); + final HoodieSinkDataConverter hoodieKeyGenerator = + new TSBasedHoodieSinkDataConverter(conf, hoodieConf, TimeUnit.MILLISECONDS); + final HadoopConfiguration hadoopConf = new HadoopConfiguration(new Configuration()); final HoodieBasedMetadataManager hoodieBasedMetadataManager = new HoodieBasedMetadataManager(hoodieConf, - new AtomicBoolean(true), this.jsc.get()); + hadoopConf, new AtomicBoolean(true), this.jsc.get()); hoodieBasedMetadataManager.set("randomKey", new StringValue("randomValue")); - final MockHoodieSink hoodieSink = new MockHoodieSink(hoodieConf, hoodieKeyGenerator, jsc.get(), UPSERT, - hoodieBasedMetadataManager); + final MockHoodieSink hoodieSink = new MockHoodieSink(hoodieConf, hadoopConf, hoodieKeyGenerator, jsc.get(), + hoodieBasedMetadataManager); final JavaRDD inputRDD = - this.jsc.get().parallelize(getRandomData(schemaStr, TS_KEY, RECORD_KEY, 10)); + this.jsc.get().parallelize(getRandomData(schemaStr, TS_KEY, RECORD_KEY, 10)); final Map emptyTags = new HashMap<>(); final DataFeedMetrics dfm = new DataFeedMetrics(JOB_NAME, emptyTags); hoodieSink.setDataFeedMetrics(dfm); @@ -312,7 +327,7 @@ public void testHoodieSinkWriteUpsertWithMetadata() throws IOException { final HoodieWriteClientWrapper hoodieWriteClientWrapper = hoodieSink.getMockWriteClient(); // It should generate exactly one commit file. Assert.assertEquals(1, getCommitFiles(basePath, FSUtils.getFs(new Configuration(), - Optional.of(basePath))).size()); + Optional.of(basePath))).size()); /* Expected function calls. 1) startCommit (once). @@ -325,12 +340,12 @@ public void testHoodieSinkWriteUpsertWithMetadata() throws IOException { Mockito.verify(hoodieWriteClientWrapper, Mockito.times(1)).startCommit(); Mockito.verify(hoodieWriteClientWrapper, Mockito.times(1)) - .upsert(Matchers.any(JavaRDD.class), Matchers.anyString()); + .upsert(Matchers.any(JavaRDD.class), Matchers.anyString()); Mockito.verify(hoodieWriteClientWrapper, Mockito.times(0)) - .bulkInsert(Matchers.any(JavaRDD.class), Matchers.anyString()); + .bulkInsert(Matchers.any(JavaRDD.class), Matchers.anyString()); Mockito.verify(hoodieWriteClientWrapper, Mockito.times(1)) - .commit(Matchers.anyString(), Matchers.any(JavaRDD.class), - Matchers.eq(java.util.Optional.of(hoodieBasedMetadataManager.getMetadataInfo()))); + .commit(Matchers.anyString(), Matchers.any(JavaRDD.class), + Matchers.eq(Option.of(hoodieBasedMetadataManager.getMetadataInfo()))); Assert.assertFalse(hoodieBasedMetadataManager.shouldSaveChanges().get()); } @@ -339,16 +354,18 @@ public void testHoodieSinkWriteDedupeInsert() throws IOException { final String basePath = FileTestUtil.getTempFolder(); final String tableName = "test-table"; final String schemaStr = getSchema(TS_KEY, RECORD_KEY, 4, 8).toString(); - final HoodieSinkDataConverter hoodieKeyGenerator = - new TSBasedHoodieSinkDataConverter(this.conf, RECORD_KEY, TS_KEY, TimeUnit.MILLISECONDS); final HoodieConfiguration hoodieConf = - HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") - .withBasePath(basePath).withSchema(schemaStr) - .withCombineBeforeInsert(true).withCombineBeforeUpsert(true).enableMetrics(false).build(); + HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") + .withBasePath(basePath).withSchema(schemaStr).withCombineBeforeInsert(true) + .withCombineBeforeUpsert(true).withRecordKey(RECORD_KEY).withSinkOp("DEDUP_INSERT") + .withPartitionPath(TS_KEY).enableMetrics(false).build(); + final HoodieSinkDataConverter hoodieKeyGenerator = + new TSBasedHoodieSinkDataConverter(conf, hoodieConf, TimeUnit.MILLISECONDS); final JavaRDD inputRDD = - this.jsc.get().parallelize(getRandomData(schemaStr, TS_KEY, RECORD_KEY, 10)); + this.jsc.get().parallelize(getRandomData(schemaStr, TS_KEY, RECORD_KEY, 10)); - final MockHoodieSink hoodieSink = new MockHoodieSink(hoodieConf, hoodieKeyGenerator, jsc.get(), DEDUP_INSERT); + final HadoopConfiguration hadoopConf = new HadoopConfiguration(new Configuration()); + final MockHoodieSink hoodieSink = new MockHoodieSink(hoodieConf, hadoopConf, hoodieKeyGenerator, jsc.get()); final Map emptyTags = new HashMap<>(); final DataFeedMetrics dfm = new DataFeedMetrics(JOB_NAME, emptyTags); hoodieSink.setDataFeedMetrics(dfm); @@ -358,7 +375,7 @@ public void testHoodieSinkWriteDedupeInsert() throws IOException { // It should generate exactly one commit file. Assert.assertEquals(1, getCommitFiles(basePath, FSUtils.getFs(new Configuration(), - Optional.of(basePath))).size()); + Optional.of(basePath))).size()); /* Expected function calls. 1) startCommit (once). @@ -370,24 +387,24 @@ public void testHoodieSinkWriteDedupeInsert() throws IOException { */ Mockito.verify(hoodieWriteClientWrapper, Mockito.times(1)).startCommit(); Mockito.verify(hoodieWriteClientWrapper, Mockito.times(0)) - .upsert(Matchers.any(JavaRDD.class), Matchers.anyString()); + .upsert(Matchers.any(JavaRDD.class), Matchers.anyString()); Mockito.verify(hoodieWriteClientWrapper, Mockito.times(1)) - .insert(Matchers.any(JavaRDD.class), Matchers.anyString()); + .insert(Matchers.any(JavaRDD.class), Matchers.anyString()); Mockito.verify(hoodieWriteClientWrapper, Mockito.times(1)) - .filterExists(Matchers.any(JavaRDD.class)); + .filterExists(Matchers.any(JavaRDD.class)); Mockito.verify(hoodieWriteClientWrapper, Mockito.times(1)) - .commit(Matchers.anyString(), Matchers.any(JavaRDD.class), - Matchers.same(java.util.Optional.empty())); + .commit(Matchers.anyString(), Matchers.any(JavaRDD.class), + Matchers.same(Option.empty())); Mockito.verify(hoodieWriteClientWrapper, Mockito.times(1)).close(); // If we try to re-insert then it should find all the records as a a part filterExists test and should not // call bulkInsert. - final MockHoodieSink hoodieSink2 = new MockHoodieSink(hoodieConf, hoodieKeyGenerator, jsc.get(), DEDUP_INSERT); + final MockHoodieSink hoodieSink2 = new MockHoodieSink(hoodieConf, hadoopConf, hoodieKeyGenerator, jsc.get()); hoodieSink.write(inputRDD); final HoodieWriteClientWrapper hoodieWriteClientWrapper2 = hoodieSink.getMockWriteClient(); Assert.assertEquals(2, getCommitFiles(basePath, FSUtils.getFs(new Configuration(), - Optional.of(basePath))).size()); + Optional.of(basePath))).size()); final ArgumentCaptor rddCaputure = ArgumentCaptor.forClass(JavaRDD.class); verify(hoodieWriteClientWrapper2).insert(rddCaputure.capture(), Matchers.any()); // All records should get filtered out. @@ -400,12 +417,16 @@ public void testHoodieSinkMetrics() throws IOException { final String tableName = "test-table"; final String schemaStr = getSchema(TS_KEY, RECORD_KEY, 4, 8).toString(); final String brokenSchemaStr = getSchema(TS_KEY, RECORD_KEY, 0, 0).toString(); - final HoodieSinkDataConverter hoodieKeyGenerator = - new TSBasedHoodieSinkDataConverter(conf, RECORD_KEY, TS_KEY, TimeUnit.MILLISECONDS); + final HoodieConfiguration hoodieConf = HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") - .withBasePath(basePath).withSchema(schemaStr).enableMetrics(false).build(); - final MockHoodieSink hoodieSink = new MockHoodieSink(hoodieConf, hoodieKeyGenerator, jsc.get(), INSERT); + .withBasePath(basePath).withSchema(schemaStr).withRecordKey(RECORD_KEY) + .withPartitionPath(TS_KEY).withSinkOp("INSERT").enableMetrics(false).build(); + final HoodieSinkDataConverter hoodieKeyGenerator = + new TSBasedHoodieSinkDataConverter(conf, hoodieConf, TimeUnit.MILLISECONDS); + + final HadoopConfiguration hadoopConf = new HadoopConfiguration(new Configuration()); + final MockHoodieSink hoodieSink = new MockHoodieSink(hoodieConf, hadoopConf, hoodieKeyGenerator, jsc.get()); final Map emptyTags = new HashMap<>(); final DataFeedMetrics dfm = new DataFeedMetrics(JOB_NAME, emptyTags); hoodieSink.setDataFeedMetrics(dfm); @@ -419,21 +440,21 @@ public void testHoodieSinkMetrics() throws IOException { final Set ms = dfm.getMetricSet(); final Map metricMap = new HashMap<>(); - ms.forEach( metric -> { + ms.forEach(metric -> { final String key = metric.getMetricName(); metricMap.put(key, metric.getMetricValue()); }); final Map expected = ArrayUtils.toMap( - new Object[][] { + new Object[][]{ {"output_rowcount", successRecordCount.longValue()}, {"error_rowcount", failedRecordCount.longValue()}, - {"total_file_count", (long)metricMap.get("total_file_count")}, - {"total_write_size", (long)metricMap.get("total_write_size")} + {"total_file_count", (long) metricMap.get("total_file_count")}, + {"total_write_size", (long) metricMap.get("total_write_size")} }); Assert.assertEquals(expected.size(), ms.size()); - ms.forEach( metric -> { + ms.forEach(metric -> { final String key = metric.getMetricName(); Assert.assertEquals("failure for metric " + key, expected.get(key), metric.getMetricValue()); }); @@ -444,15 +465,19 @@ public void testUserDefinedCommitTime() throws IOException { final String basePath = FileTestUtil.getTempFolder(); final String tableName = "test-table"; final String schemaStr = getSchema(TS_KEY, RECORD_KEY, 4, 8).toString(); - final HoodieSinkDataConverter hoodieKeyGenerator = - new TSBasedHoodieSinkDataConverter(this.conf, RECORD_KEY, TS_KEY, TimeUnit.MILLISECONDS); + final HoodieConfiguration hoodieConf = - HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") - .withBasePath(basePath).withSchema(schemaStr).enableMetrics(false).build(); + HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") + .withBasePath(basePath).withSchema(schemaStr).withRecordKey(RECORD_KEY) + .withPartitionPath(TS_KEY).withSinkOp("BULK_INSERT").enableMetrics(false).build(); + final HoodieSinkDataConverter hoodieKeyGenerator = + new TSBasedHoodieSinkDataConverter(conf, hoodieConf, TimeUnit.MILLISECONDS); + final JavaRDD inputRDD = - this.jsc.get().parallelize(getRandomData(schemaStr, TS_KEY, RECORD_KEY, 10)); + this.jsc.get().parallelize(getRandomData(schemaStr, TS_KEY, RECORD_KEY, 10)); - final MockHoodieSink hoodieSink1 = new MockHoodieSink(hoodieConf, hoodieKeyGenerator, jsc.get(), BULK_INSERT); + final HadoopConfiguration hadoopConf = new HadoopConfiguration(new Configuration()); + final MockHoodieSink hoodieSink1 = new MockHoodieSink(hoodieConf, hadoopConf, hoodieKeyGenerator, jsc.get()); final Map emptyTags = new HashMap<>(); final DataFeedMetrics dfm = new DataFeedMetrics(JOB_NAME, emptyTags); hoodieSink1.setDataFeedMetrics(dfm); @@ -461,14 +486,14 @@ public void testUserDefinedCommitTime() throws IOException { Mockito.verify(hoodieWriteClientWrapper1, Mockito.times(1)).startCommit(); final List commitFilesAfterFirstCommit = getCommitFiles(basePath, FSUtils.getFs(new Configuration(), - Optional.of(basePath))); + Optional.of(basePath))); Assert.assertEquals(1, commitFilesAfterFirstCommit.size()); final String customCommit = - HoodieActiveTimeline.COMMIT_FORMATTER.format( - new Date(new Date().getTime() - TimeUnit.DAYS.toMillis(365))); + HoodieActiveTimeline.COMMIT_FORMATTER.format( + new Date(new Date().getTime() - TimeUnit.DAYS.toMillis(365))); - final MockHoodieSink hoodieSink2 = new MockHoodieSink(hoodieConf, hoodieKeyGenerator, jsc.get(), BULK_INSERT); + final MockHoodieSink hoodieSink2 = new MockHoodieSink(hoodieConf, hadoopConf, hoodieKeyGenerator, jsc.get()); hoodieSink2.setDataFeedMetrics(dfm); hoodieSink2.setCommitTime(com.google.common.base.Optional.of(customCommit)); @@ -477,7 +502,7 @@ public void testUserDefinedCommitTime() throws IOException { Mockito.verify(hoodieWriteClientWrapper2, Mockito.times(0)).startCommit(); final List commitFilesAfterSecondCommit = getCommitFiles(basePath, FSUtils.getFs(new Configuration(), - Optional.of(basePath))); + Optional.of(basePath))); Assert.assertEquals(2, commitFilesAfterSecondCommit.size()); final String oldCommitTime = commitFilesAfterFirstCommit.get(0); diff --git a/marmaray/src/test/resources/log4j-surefire.properties b/marmaray/src/test/resources/log4j-surefire.properties index f75a311..18d1f40 100644 --- a/marmaray/src/test/resources/log4j-surefire.properties +++ b/marmaray/src/test/resources/log4j-surefire.properties @@ -1,6 +1,6 @@ log4j.rootLogger=WARN, A1 log4j.category.com.uber=WARN -log4j.category.com.uber.hoodie.common.utils=WARN +log4j.category.org.apache.hudi.common.utils=WARN log4j.category.org.apache.parquet.hadoop=WARN # A1 is set to be a ConsoleAppender. diff --git a/pom.xml b/pom.xml index 11b37f8..5acef08 100644 --- a/pom.xml +++ b/pom.xml @@ -41,7 +41,7 @@ 1.1.0 2.6 1.2 - 0.4.1 + 0.5.0-incubating-rc5 4.5.2 3.1.3.2 3.2.0 @@ -610,9 +610,9 @@ - com.uber.hoodie - hoodie-client - ${hoodie.version} + org.apache.hudi + hudi-client + ${hudi.version} com.google.guava @@ -641,9 +641,9 @@ - com.uber.hoodie - hoodie-common - ${hoodie.version} + org.apache.hudi + hudi-common + ${hudi.version} org.codehaus.jackson