Skip to content
This repository was archived by the owner on Jan 5, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@
*/
package com.uber.marmaray.common;

import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.HoodieAvroUtils;
import com.uber.marmaray.common.exceptions.JobRuntimeException;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;

import lombok.NonNull;
import org.apache.hudi.common.util.Option;

import java.io.IOException;
import java.util.Optional;
Expand All @@ -40,13 +41,13 @@ public HoodieErrorPayload(@NonNull final GenericRecord record) {
}

@Override
public Optional<IndexedRecord> getInsertValue(final Schema schema) throws IOException {
final Optional<GenericRecord> record = getRecord();
public Option<IndexedRecord> getInsertValue(final Schema schema) throws IOException {
final Option<GenericRecord> record = getRecord();
return record.map(r -> HoodieAvroUtils.rewriteRecord(r, schema));
}

protected Optional<GenericRecord> getRecord() {
return Optional.of(this.record);
protected Option<GenericRecord> getRecord() {
return Option.of(this.record);
}

@Override
Expand All @@ -55,7 +56,7 @@ public HoodieErrorPayload preCombine(final HoodieErrorPayload hoodieErrorPayload
}

@Override
public Optional<IndexedRecord> combineAndGetUpdateValue(final IndexedRecord indexedRecord, final Schema schema)
public Option<IndexedRecord> combineAndGetUpdateValue(final IndexedRecord indexedRecord, final Schema schema)
throws IOException {
throw new JobRuntimeException("Not implemented yet!!");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@
package com.uber.marmaray.common.configuration;

import com.google.common.base.Optional;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.HoodieCleaningPolicy;
import com.uber.hoodie.common.table.HoodieTableConfig;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieMetricsConfig;
import com.uber.hoodie.config.HoodieStorageConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import org.apache.hudi.WriteStatus;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieMetricsConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import com.uber.marmaray.common.exceptions.JobRuntimeException;
import com.uber.marmaray.common.exceptions.MissingPropertyException;
import com.uber.marmaray.common.sinks.hoodie.HoodieSink;
import com.uber.marmaray.utilities.ConfigUtil;
import com.uber.marmaray.utilities.StringTypes;
import lombok.Getter;
Expand Down Expand Up @@ -65,6 +66,22 @@ public class HoodieConfiguration implements Serializable {
* Schema for Hoodie dataset
*/
public static final String HOODIE_AVRO_SCHEMA = HOODIE_COMMON_PROPERTY_PREFIX + "schema";

/**
* Record Key for Hoodie dataset
*/
public static final String HOODIE_RECORD_KEY = HOODIE_COMMON_PROPERTY_PREFIX + "record_key";

/**
* Partition path for Hoodie dataset
*/
public static final String HOODIE_PARTITION_PATH = HOODIE_COMMON_PROPERTY_PREFIX + "partition_path";

/**
* Partition path for Hoodie dataset
*/
public static final String HOODIE_SINK_OP = HOODIE_COMMON_PROPERTY_PREFIX + "sink_op";

/**
* Flag to control whether it should combine before insert
*/
Expand Down Expand Up @@ -250,6 +267,31 @@ public String getTableName() {
return this.getConf().getProperty(getTablePropertyKey(HOODIE_TABLE_NAME, this.tableKey)).get();
}

/**
* @return hoodie record key.
*/
public Optional<String> getHoodieRecordKey() {
return this.conf.getProperty(getTablePropertyKey(HOODIE_RECORD_KEY, this.tableKey));
}

/**
* @return hoodie partition path.
*/
public Optional<String> getHoodiePartitionPath() {
return this.conf.getProperty(getTablePropertyKey(HOODIE_PARTITION_PATH, this.tableKey));
}

/**
* @return hoodie sink operation
*/
public HoodieSink.HoodieSinkOp getHoodieSinkOp() {
Optional<String> sinkOp = this.conf.getProperty(getTablePropertyKey(HOODIE_SINK_OP, this.tableKey));
if (sinkOp.isPresent()) {
return HoodieSink.HoodieSinkOp.valueOf(sinkOp.get().toUpperCase());
}
return HoodieSink.HoodieSinkOp.BULK_INSERT;
}

/**
* @return hoodie metrics prefix.
* */
Expand All @@ -263,7 +305,7 @@ public String getHoodieDataPartitioner(@NotEmpty final String defaultDataPartiti
}

/**
* @return true if {@link com.uber.hoodie.HoodieWriteClient} should rollback inflight commits from previous write
* @return true if {@link org.apache.hudi.HoodieWriteClient} should rollback inflight commits from previous write
* call.
*/
public boolean shouldRollbackInFlight() {
Expand Down Expand Up @@ -392,6 +434,7 @@ public HoodieWriteConfig getHoodieWriteConfig() {
).build());

// Hoodie index config

builder.withIndexConfig(new HoodieIndexConfiguration(getConf(), getTableKey()).configureHoodieIndex());

// Hoodie metrics config
Expand Down Expand Up @@ -421,12 +464,6 @@ public HoodieWriteConfig getHoodieWriteConfig() {
throw new JobRuntimeException(errorStr, e);
}

// enable tmp directory writes for hoodie.
builder.withUseTempFolderCopyOnWriteForCreate(true);

// enabled the renaming for copy detection on merge
builder.withUseTempFolderCopyOnWriteForMerge(true);

return builder.build();
} catch (IllegalArgumentException e) {
throw new MissingPropertyException(e.getMessage(), e);
Expand Down Expand Up @@ -492,11 +529,26 @@ public Builder withBasePath(@NotEmpty final String basePath) {
return this;
}

public Builder withRecordKey(@NotEmpty final String recordKey) {
this.conf.setProperty(getTablePropertyKey(HOODIE_RECORD_KEY, this.tableKey), recordKey);
return this;
}

public Builder withPartitionPath(@NotEmpty final String partitionPath) {
this.conf.setProperty(getTablePropertyKey(HOODIE_PARTITION_PATH, this.tableKey), partitionPath);
return this;
}

public Builder withSchema(@NotEmpty final String schema) {
this.conf.setProperty(getTablePropertyKey(HOODIE_AVRO_SCHEMA, this.tableKey), schema);
return this;
}

public Builder withSinkOp(@NotEmpty final String sinkOp) {
this.conf.setProperty(getTablePropertyKey(HOODIE_SINK_OP, this.tableKey), sinkOp);
return this;
}

public Builder withBulkInsertParallelism(final int parallelism) {
this.conf.setProperty(
getTablePropertyKey(HOODIE_BULKINSERT_PARALLELISM, this.tableKey), Integer.toString(parallelism));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package com.uber.marmaray.common.configuration;

import com.google.common.base.Preconditions;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.index.HoodieIndex.IndexType;
import org.apache.hudi.config.DefaultHoodieConfig;
import org.apache.hudi.config.HoodieHBaseIndexConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndex.IndexType;
import com.uber.marmaray.common.exceptions.JobRuntimeException;
import com.uber.marmaray.utilities.StringTypes;
import lombok.Getter;
Expand Down Expand Up @@ -165,23 +167,26 @@ public HoodieIndexConfig configureHoodieIndex() {
version = DEFAULT_VERSION;
}
final String topicName = getTableName();

final HoodieIndexConfig.Builder builder = HoodieIndexConfig.newBuilder()
.withIndexType(getHoodieIndexType());

if (HoodieIndex.IndexType.HBASE.equals(getHoodieIndexType())) {
final HoodieHBaseIndexConfig.Builder hoodieHBaseIndexConfigBuilder = HoodieHBaseIndexConfig.newBuilder();
final String quorum = getHoodieIndexZookeeperQuorum();
final Integer port = getHoodieIndexZookeeperPort();
final String zkZnodeParent = getZkZnodeParent();
createHbaseIndexTableIfNotExists(topicName, quorum, port.toString(), zkZnodeParent,
version);
builder
hoodieHBaseIndexConfigBuilder
.hbaseIndexGetBatchSize(getHoodieIndexGetBatchSize())
.hbaseTableName(getHoodieHbaseIndexTableName())
.hbaseZkPort(port)
.hbaseZkQuorum(quorum);
final HoodieHBaseIndexConfig hoodieHBaseIndexConfig = hoodieHBaseIndexConfigBuilder.build();
builder.withHBaseIndexConfig(hoodieHBaseIndexConfig);
}

return builder.build();
return builder.build();
}

public void createHbaseIndexTableIfNotExists(@NotEmpty final String dataFeed, @NotEmpty final String zkQuorum,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.uber.marmaray.common.AvroPayload;
import com.uber.marmaray.common.configuration.Configuration;
import com.uber.marmaray.common.configuration.HoodieConfiguration;
import com.uber.marmaray.utilities.ErrorExtractor;

import lombok.NonNull;
Expand All @@ -29,7 +30,9 @@
*/
public class DummyHoodieSinkDataConverter extends HoodieSinkDataConverter {
public DummyHoodieSinkDataConverter() {
super(new Configuration(), new ErrorExtractor());

super(new Configuration(), new ErrorExtractor(), HoodieConfiguration.newBuilder(new Configuration(),
"test").build());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
*/
package com.uber.marmaray.common.converters.data;

import com.uber.hoodie.common.model.HoodieAvroPayload;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import com.uber.marmaray.common.AvroPayload;
import com.uber.marmaray.common.configuration.Configuration;
import com.uber.marmaray.common.configuration.HoodieConfiguration;
import com.uber.marmaray.common.converters.converterresult.ConverterResult;
import com.uber.marmaray.common.exceptions.InvalidDataException;
import com.uber.marmaray.common.metrics.DataFeedMetrics;
import com.uber.marmaray.common.metrics.JobMetrics;
import com.uber.marmaray.common.sinks.hoodie.HoodieSink;
Expand All @@ -35,27 +37,35 @@
import java.util.Collections;
import java.util.List;

import com.google.common.base.Optional;
import org.apache.hudi.common.util.Option;

/**
* {@link HoodieSinkDataConverter} extends {@link SinkDataConverter}
* This class is used by {@link HoodieSink} to generate {@link com.uber.hoodie.common.model.HoodieRecord} from
* This class is used by {@link HoodieSink} to generate {@link org.apache.hudi.common.model.HoodieRecord} from
* {@link com.uber.marmaray.common.AvroPayload}.
*/
public abstract class HoodieSinkDataConverter extends SinkDataConverter<Schema, HoodieRecord<HoodieRecordPayload>> {
public class HoodieSinkDataConverter extends SinkDataConverter<Schema, HoodieRecord<HoodieRecordPayload>> {

// store the schema as a string since Schema doesn't serialize. Used in extended classes.
protected String schema;
private final ErrorExtractor errorExtractor;
private final HoodieConfiguration hoodieConfiguration;

public HoodieSinkDataConverter(@NonNull final Configuration conf, @NonNull final ErrorExtractor errorExtractor) {
public HoodieSinkDataConverter(@NonNull final Configuration conf, @NonNull final ErrorExtractor errorExtractor,
@NonNull final HoodieConfiguration hoodieConfiguration) {
super(conf, errorExtractor);
this.errorExtractor = errorExtractor;
this.hoodieConfiguration = hoodieConfiguration;
}

public HoodieSinkDataConverter(@NonNull final Configuration conf, final String schema,
@NonNull final ErrorExtractor errorExtractor) {
@NonNull final ErrorExtractor errorExtractor,
HoodieConfiguration hoodieConfiguration) {
super(conf, errorExtractor);
this.schema = schema;
this.errorExtractor = errorExtractor;
this.hoodieConfiguration = hoodieConfiguration;
}

@Override
Expand All @@ -82,17 +92,37 @@ protected final List<ConverterResult<AvroPayload, HoodieRecord<HoodieRecordPaylo
*
* @param payload {@link AvroPayload}.
*/
protected abstract String getRecordKey(@NonNull final AvroPayload payload) throws Exception;
protected String getRecordKey(@NonNull final AvroPayload payload) throws Exception {
Optional<String> hoodieRecordKey = hoodieConfiguration.getHoodieRecordKey();
if (hoodieRecordKey.isPresent()) {
final Object recordKeyFieldVal = payload.getData().get(hoodieRecordKey.get());
if (recordKeyFieldVal == null) {
throw new InvalidDataException("required field is missing:" + hoodieRecordKey.get());
}
return recordKeyFieldVal.toString();
}
throw new Exception("Hoodie Record Key missing");
}

/**
* The implementation of it should use fields from {@link AvroPayload} to generate partition path which is needed
* for {@link HoodieKey}.
*
* @param payload {@link AvroPayload}.
*/
protected abstract String getPartitionPath(@NonNull final AvroPayload payload) throws Exception;
protected String getPartitionPath(@NonNull final AvroPayload payload) throws Exception {
Optional<String> hoodiePartitionPath = hoodieConfiguration.getHoodiePartitionPath();
if (hoodiePartitionPath.isPresent()) {
final Object partitionFieldVal = payload.getData().get(hoodiePartitionPath.get());
if (partitionFieldVal == null) {
throw new InvalidDataException("required field is missing:" + hoodiePartitionPath.get());
}
return partitionFieldVal.toString();
}
throw new Exception("Hoodie Partition Path missing");
}

protected HoodieRecordPayload getPayload(@NonNull final AvroPayload payload) {
return new HoodieAvroPayload(java.util.Optional.of(payload.getData()));
return new HoodieAvroPayload(Option.of(payload.getData()));
}
}
Loading