diff --git a/singer-commons/src/main/thrift/config.thrift b/singer-commons/src/main/thrift/config.thrift index 60c41120..2ab71f4c 100644 --- a/singer-commons/src/main/thrift/config.thrift +++ b/singer-commons/src/main/thrift/config.thrift @@ -221,6 +221,7 @@ struct MemqWriterConfig { 12: optional MemqAuditorConfig auditorConfig; 13: optional i32 maxInFlightRequestsMemoryBytes = 33554432; // 32 MB 14: optional i32 maxBlockMs = 0; // non-blocking by default + 15: optional i32 numWriteEndpoints = 1; // producer to broker mapping, 1:1 by default } struct PulsarProducerConfig { diff --git a/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java b/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java index 3e1657f2..728a987f 100644 --- a/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java +++ b/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java @@ -457,6 +457,9 @@ public Void apply(byte[] data) { if (configuration.containsKey("maxBlockMs")) { config.setMaxBlockMs(configuration.getInt("maxBlockMs")); } + if (configuration.containsKey("numWriteEndpoints")) { + config.setNumWriteEndpoints(configuration.getInt("numWriteEndpoints")); + } return config; }