Skip to content

Commit ad1e72b

Browse files
feat: Add the ability to set gRPC compression in the CPS sink connector. (googleapis#259)
* feat: Add the ability to set gRPC compression in the CPS sink connector. * Add units to docs
1 parent b7eea57 commit ad1e72b

File tree

3 files changed

+29
-0
lines changed

3 files changed

+29
-0
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,8 @@ configurations:
210210
| headers.publish | Boolean | false | When true, include any headers as attributes when a message is published to Pub/Sub. |
211211
| orderingKeySource | String (none, key, partition) | none | When set to "none", do not set the ordering key. When set to "key", uses a message's key as the ordering key. If set to "partition", converts the partition number to a String and uses that as the ordering key. Note that using "partition" should only be used for low-throughput topics or topics with thousands of partitions. |
212212
| messageBodyName | String | "cps_message_body" | When using a struct or map value schema, this field or key name indicates that the corresponding value will go into the Pub/Sub message body. |
213+
| enableCompression | Boolean | false | When true, enable [publish-side compression](https://cloud.google.com/pubsub/docs/publisher#compressing) in order to save on networking costs between Kafka Connect and Cloud Pub/Sub. |
214+
| compressionBytesThreshold | Long | 240 | When enableCompression is true, the minimum size of publish request (in bytes) to compress.
213215

214216
### Pub/Sub Lite connector configs
215217

src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ public class CloudPubSubSinkConnector extends SinkConnector {
4747
public static final String MAX_REQUEST_TIMEOUT_MS = "maxRequestTimeoutMs";
4848
public static final String MAX_TOTAL_TIMEOUT_MS = "maxTotalTimeoutMs";
4949
public static final String MAX_SHUTDOWN_TIMEOUT_MS = "maxShutdownTimeoutMs";
50+
public static final String ENABLE_COMPRESSION = "enableCompression";
51+
public static final String COMPRESSION_BYTES_THRESHOLD = "compressionBytesThreshold";
5052
public static final int DEFAULT_MAX_BUFFER_SIZE = 100;
5153
public static final long DEFAULT_MAX_BUFFER_BYTES = 9500000L;
5254
public static final int DEFAULT_DELAY_THRESHOLD_MS = 100;
@@ -61,6 +63,8 @@ public class CloudPubSubSinkConnector extends SinkConnector {
6163
public static final String PUBLISH_KAFKA_HEADERS = "headers.publish";
6264
public static final String ORDERING_KEY_SOURCE = "orderingKeySource";
6365
public static final String DEFAULT_ORDERING_KEY_SOURCE = "none";
66+
public static final boolean DEFAULT_ENABLE_COMPRESSION = false;
67+
public static final long DEFAULT_COMPRESSION_BYTES_THRESHOLD = 240L;
6468

6569
/** Defines the accepted values for the {@link #ORDERING_KEY_SOURCE}. */
6670
public enum OrderingKeySource {
@@ -250,6 +254,20 @@ public ConfigDef config() {
250254
Importance.MEDIUM,
251255
"What to use to populate the Pub/Sub message ordering key. Possible values are "
252256
+ "\"none\", \"key\", or \"partition\".")
257+
.define(
258+
ENABLE_COMPRESSION,
259+
Type.BOOLEAN,
260+
DEFAULT_ENABLE_COMPRESSION,
261+
Importance.MEDIUM,
262+
"When \"true\", use gRPC Gzip compression on publish requests before sending them "
263+
+ "to Cloud Pub/Sub.")
264+
.define(
265+
COMPRESSION_BYTES_THRESHOLD,
266+
Type.LONG,
267+
DEFAULT_COMPRESSION_BYTES_THRESHOLD,
268+
Importance.MEDIUM,
269+
"The number of bytes at which to compress a request when publishing to "
270+
+ "Cloud Pub/Sub. Only takes effect if \"enableCompression\" is \"true\".")
253271
.define(
254272
ConnectorUtils.CPS_ENDPOINT,
255273
Type.STRING,

src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ public class CloudPubSubSinkTask extends SinkTask {
8181
private boolean includeMetadata;
8282
private boolean includeHeaders;
8383
private OrderingKeySource orderingKeySource;
84+
private boolean enableCompression;
85+
private long compressionBytesThreshold;
8486
private ConnectorCredentialsProvider gcpCredentialsProvider;
8587
private com.google.cloud.pubsub.v1.Publisher publisher;
8688

@@ -135,6 +137,9 @@ public void start(Map<String, String> props) {
135137
orderingKeySource =
136138
OrderingKeySource.getEnum(
137139
(String) validatedProps.get(CloudPubSubSinkConnector.ORDERING_KEY_SOURCE));
140+
enableCompression = (Boolean) validatedProps.get(CloudPubSubSinkConnector.ENABLE_COMPRESSION);
141+
compressionBytesThreshold =
142+
(Long) validatedProps.get(CloudPubSubSinkConnector.COMPRESSION_BYTES_THRESHOLD);
138143
gcpCredentialsProvider = ConnectorCredentialsProvider.fromConfig(validatedProps);
139144
if (publisher == null) {
140145
// Only do this if we did not use the constructor.
@@ -413,6 +418,10 @@ private void createPublisher() {
413418
if (orderingKeySource != OrderingKeySource.NONE) {
414419
builder.setEnableMessageOrdering(true);
415420
}
421+
if (enableCompression) {
422+
builder.setEnableCompression(true);
423+
builder.setCompressionBytesThreshold(compressionBytesThreshold);
424+
}
416425
try {
417426
publisher = builder.build();
418427
} catch (Exception e) {

0 commit comments

Comments
 (0)