Skip to content
This repository was archived by the owner on Mar 10, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions s2-sdk/src/main/java/s2/client/StreamClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class StreamClient extends BasinClient {
/** Name of stream associated with this client. */
final String streamName;

private static final String compressionCodec = "gzip";

private final StreamServiceFutureStub futureStub;
final StreamServiceStub asyncStub;

Expand All @@ -61,14 +63,23 @@ public StreamClient(
var meta = new Metadata();
meta.put(Key.of("s2-basin", Metadata.ASCII_STRING_MARSHALLER), basin);
this.streamName = streamName;
this.futureStub =

StreamServiceFutureStub futureStub =
StreamServiceGrpc.newFutureStub(channel)
.withCallCredentials(new BearerTokenCallCredentials(config.token))
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(meta));
this.asyncStub =
StreamServiceStub asyncStub =
StreamServiceGrpc.newStub(channel)
.withCallCredentials(new BearerTokenCallCredentials(config.token))
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(meta));

if (config.compression) {
futureStub = futureStub.withCompression(compressionCodec);
asyncStub = asyncStub.withCompression(compressionCodec);
}

this.futureStub = futureStub;
this.asyncStub = asyncStub;
}

/**
Expand Down
10 changes: 10 additions & 0 deletions s2-sdk/src/main/java/s2/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
public class Config {
public final String token;
public final AppendRetryPolicy appendRetryPolicy;
public final Boolean compression;
public final Endpoints endpoints;
public final Integer maxAppendInflightBytes;
public final Integer maxRetries;
Expand All @@ -17,6 +18,7 @@ public class Config {
private Config(
String token,
AppendRetryPolicy appendRetryPolicy,
Boolean compression,
Endpoints endpoints,
Integer maxAppendInflightBytes,
Integer maxRetries,
Expand All @@ -25,6 +27,7 @@ private Config(
String userAgent) {
this.token = token;
this.appendRetryPolicy = appendRetryPolicy;
this.compression = compression;
this.endpoints = endpoints;
this.maxAppendInflightBytes = maxAppendInflightBytes;
this.maxRetries = maxRetries;
Expand All @@ -46,6 +49,7 @@ public static final class ConfigBuilder {
private Optional<Duration> requestTimeout = Optional.empty();
private Optional<Duration> retryDelay = Optional.empty();
private Optional<String> userAgent = Optional.empty();
private Optional<Boolean> compression = Optional.empty();

ConfigBuilder(String token) {
this.token = token;
Expand All @@ -56,6 +60,11 @@ public ConfigBuilder withAppendRetryPolicy(AppendRetryPolicy appendRetryPolicy)
return this;
}

public ConfigBuilder withCompression(Boolean compression) {
this.compression = Optional.of(compression);
return this;
}

public ConfigBuilder withEndpoints(Endpoints endpoints) {
this.endpoints = Optional.of(endpoints);
return this;
Expand Down Expand Up @@ -91,6 +100,7 @@ public Config build() {
return new Config(
this.token,
this.appendRetryPolicy.orElse(AppendRetryPolicy.ALL),
this.compression.orElse(false),
this.endpoints.orElse(Endpoints.forCloud(Cloud.AWS)),
this.maxAppendInflightBytes.orElse(Integer.MAX_VALUE),
this.maxRetries.orElse(3),
Expand Down
Loading