From a27468b559282b5067bd2e8e062bb99f66c583e4 Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Wed, 5 Nov 2025 14:50:15 -0500 Subject: [PATCH] Add numWriteEndpoints config for MemqWriter --- singer-commons/src/main/thrift/config.thrift | 1 + .../main/java/com/pinterest/singer/utils/LogConfigUtils.java | 3 +++ 2 files changed, 4 insertions(+) diff --git a/singer-commons/src/main/thrift/config.thrift b/singer-commons/src/main/thrift/config.thrift index 60c41120..2ab71f4c 100644 --- a/singer-commons/src/main/thrift/config.thrift +++ b/singer-commons/src/main/thrift/config.thrift @@ -221,6 +221,7 @@ struct MemqWriterConfig { 12: optional MemqAuditorConfig auditorConfig; 13: optional i32 maxInFlightRequestsMemoryBytes = 33554432; // 32 MB 14: optional i32 maxBlockMs = 0; // non-blocking by default + 15: optional i32 numWriteEndpoints = 1; // producer to broker mapping, 1:1 by default } struct PulsarProducerConfig { diff --git a/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java b/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java index 3e1657f2..728a987f 100644 --- a/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java +++ b/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java @@ -457,6 +457,9 @@ public Void apply(byte[] data) { if (configuration.containsKey("maxBlockMs")) { config.setMaxBlockMs(configuration.getInt("maxBlockMs")); } + if (configuration.containsKey("numWriteEndpoints")) { + config.setNumWriteEndpoints(configuration.getInt("numWriteEndpoints")); + } return config; }