diff --git a/s2-sdk/src/main/java/s2/client/StreamClient.java b/s2-sdk/src/main/java/s2/client/StreamClient.java index 11929f1..3245839 100644 --- a/s2-sdk/src/main/java/s2/client/StreamClient.java +++ b/s2-sdk/src/main/java/s2/client/StreamClient.java @@ -35,6 +35,8 @@ public class StreamClient extends BasinClient { /** Name of stream associated with this client. */ final String streamName; + private static final String compressionCodec = "gzip"; + private final StreamServiceFutureStub futureStub; final StreamServiceStub asyncStub; @@ -61,14 +63,23 @@ public StreamClient( var meta = new Metadata(); meta.put(Key.of("s2-basin", Metadata.ASCII_STRING_MARSHALLER), basin); this.streamName = streamName; - this.futureStub = + + StreamServiceFutureStub futureStub = StreamServiceGrpc.newFutureStub(channel) .withCallCredentials(new BearerTokenCallCredentials(config.token)) .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(meta)); - this.asyncStub = + StreamServiceStub asyncStub = StreamServiceGrpc.newStub(channel) .withCallCredentials(new BearerTokenCallCredentials(config.token)) .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(meta)); + + if (config.compression) { + futureStub = futureStub.withCompression(compressionCodec); + asyncStub = asyncStub.withCompression(compressionCodec); + } + + this.futureStub = futureStub; + this.asyncStub = asyncStub; } /** diff --git a/s2-sdk/src/main/java/s2/config/Config.java b/s2-sdk/src/main/java/s2/config/Config.java index 78f1286..4386b98 100644 --- a/s2-sdk/src/main/java/s2/config/Config.java +++ b/s2-sdk/src/main/java/s2/config/Config.java @@ -7,6 +7,7 @@ public class Config { public final String token; public final AppendRetryPolicy appendRetryPolicy; + public final Boolean compression; public final Endpoints endpoints; public final Integer maxAppendInflightBytes; public final Integer maxRetries; @@ -17,6 +18,7 @@ public class Config { private Config( String token, AppendRetryPolicy appendRetryPolicy, + Boolean compression, Endpoints endpoints, Integer maxAppendInflightBytes, Integer maxRetries, @@ -25,6 +27,7 @@ private Config( String userAgent) { this.token = token; this.appendRetryPolicy = appendRetryPolicy; + this.compression = compression; this.endpoints = endpoints; this.maxAppendInflightBytes = maxAppendInflightBytes; this.maxRetries = maxRetries; @@ -46,6 +49,7 @@ public static final class ConfigBuilder { private Optional requestTimeout = Optional.empty(); private Optional retryDelay = Optional.empty(); private Optional userAgent = Optional.empty(); + private Optional compression = Optional.empty(); ConfigBuilder(String token) { this.token = token; @@ -56,6 +60,11 @@ public ConfigBuilder withAppendRetryPolicy(AppendRetryPolicy appendRetryPolicy) return this; } + public ConfigBuilder withCompression(Boolean compression) { + this.compression = Optional.of(compression); + return this; + } + public ConfigBuilder withEndpoints(Endpoints endpoints) { this.endpoints = Optional.of(endpoints); return this; @@ -91,6 +100,7 @@ public Config build() { return new Config( this.token, this.appendRetryPolicy.orElse(AppendRetryPolicy.ALL), + this.compression.orElse(false), this.endpoints.orElse(Endpoints.forCloud(Cloud.AWS)), this.maxAppendInflightBytes.orElse(Integer.MAX_VALUE), this.maxRetries.orElse(3),