Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,22 @@ public final class SdkAdvancedAsyncClientOption<T> extends ClientOption<T> {
public static final SdkAdvancedAsyncClientOption<Executor> FUTURE_COMPLETION_EXECUTOR =
new SdkAdvancedAsyncClientOption<>(Executor.class);

/**
* Configure Direct I/O for CRT file-based upload with the s3 async client. Only used with the CRT s3 async client.
* <p>
* Enabling direct I/O bypasses the OS cache. Helpful when the disk I/O outperforms the kernel cache.
* <p>
* Notes:
* <ul>
* <li>Only supported on Linux for now.</li>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way we default it to fault if user sets this for non linux ?

* <li>Only supports upload for now.</li>
* <li>Uses it as a potentially powerful tool that should be used with caution. Read NOTES for O_DIRECT</li>
* </ul>
* for additional info https://man7.org/linux/man-pages/man2/openat.2.html
*/
public static final SdkAdvancedAsyncClientOption<Boolean> CRT_UPLOAD_FILE_DIRECT_IO =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we create a new Option class for CRT and put CRT specific option there?

new SdkAdvancedAsyncClientOption<>(Boolean.class);

private SdkAdvancedAsyncClientOption(Class<T> valueClass) {
super(valueClass);
}
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@
<rxjava3.version>3.1.5</rxjava3.version>
<commons-codec.verion>1.17.1</commons-codec.verion>
<jmh.version>1.37</jmh.version>
<awscrt.version>0.38.9</awscrt.version>
<awscrt.version>0.39.0</awscrt.version>

<!--Test dependencies -->
<junit5.version>5.10.0</junit5.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.net.URI;
import java.nio.file.Path;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -25,9 +26,11 @@
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.checksums.RequestChecksumCalculation;
import software.amazon.awssdk.core.checksums.ResponseChecksumValidation;
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity;
import software.amazon.awssdk.identity.spi.IdentityProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.crt.S3CrtFileIoConfiguration;
import software.amazon.awssdk.services.s3.crt.S3CrtHttpConfiguration;
import software.amazon.awssdk.services.s3.crt.S3CrtRetryConfiguration;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
Expand Down Expand Up @@ -361,6 +364,43 @@ default S3CrtAsyncClientBuilder retryConfiguration(Consumer<S3CrtRetryConfigurat
*/
S3CrtAsyncClientBuilder disableS3ExpressSessionAuth(Boolean disableS3ExpressSessionAuth);

/**
* Controls how client performs file I/O operations. Only applies to file-based workloads.
* @param fileIoConfiguration the file options to be used
* @return an instance of this builder
*/
S3CrtAsyncClientBuilder fileIoConfiguration(S3CrtFileIoConfiguration fileIoConfiguration);

/**
* Controls how client performs file I/O operations. Only applies to file-based workloads.
* @param fileIoOptionsBuilder the file options builder to be used
* @return an instance of this builder
*/
default S3CrtAsyncClientBuilder fileIoConfiguration(Consumer<S3CrtFileIoConfiguration.Builder> fileIoOptionsBuilder) {
Validate.paramNotNull(fileIoOptionsBuilder, "fileIoOptionsBuilder");
return fileIoConfiguration(S3CrtFileIoConfiguration.builder()
.applyMutation(fileIoOptionsBuilder)
.build());
}

/**
* Configure an advanced override option. These values are used very rarely, and the majority of SDK customers can ignore
* them.
*
* @param option The option to configure.
* @param value The value of the option.
* @param <T> The type of the option.
* @return an instance of this builder
*/
<T> S3CrtAsyncClientBuilder putAdvancedOption(SdkAdvancedAsyncClientOption<T> option, T value);

/**
* Configure the map of advanced override options. This will override all values currently configured. The values in the
* map must match the key type of the map, or a runtime exception will be raised.
* @param advancedOptions the options to configure
* @return an instance of this builder
*/
S3CrtAsyncClientBuilder advancedOptions(Map<SdkAdvancedAsyncClientOption<?>, ?> advancedOptions);

@Override
S3AsyncClient build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.services.s3.crt;

import java.util.Objects;
import software.amazon.awssdk.annotations.Immutable;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.annotations.ThreadSafe;
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
import software.amazon.awssdk.utils.builder.CopyableBuilder;
import software.amazon.awssdk.utils.builder.ToCopyableBuilder;

/**
* Configuration options which controls how client performs file I/O operations. Only applies to file-based workloads.
*/
@SdkPublicApi
@Immutable
@ThreadSafe
public final class S3CrtFileIoConfiguration
implements ToCopyableBuilder<S3CrtFileIoConfiguration.Builder, S3CrtFileIoConfiguration> {
private final Boolean uploadBufferDisabled;
private final Double diskThroughputGbps;

private S3CrtFileIoConfiguration(DefaultBuilder builder) {
this.uploadBufferDisabled = builder.uploadBufferDisabled;
this.diskThroughputGbps = builder.diskThroughputGbps;
}

/**
* Creates a default builder for {@link S3CrtFileIoConfiguration}.
*/
public static Builder builder() {
return new DefaultBuilder();
}

/**
* Skip buffering the part in memory before sending the request. When set to true, the file content will not be buffered
* in memory and instead streamed to the http request.
* <p>
* Note: If upload buffering is still enabled, the CRT client will buffer <em>full parts</em> in
* memory, which could lead to out-of-memory for large files when the application does not have enough memory to fully
* buffer those parts in memory.
* <p>
* If set to true, also set the {@code diskThroughputGbps} to reasonably align with the available disk throughput.
* Otherwise, the transfer may fail with connection starvation.
* Defaults to false.
* @see SdkAdvancedAsyncClientOption#CRT_UPLOAD_FILE_DIRECT_IO
* @return if client should skip buffering in memory before sending the request.
*/
public Boolean uploadBufferDisabled() {
return uploadBufferDisabled;
}

/**
* The estimated disk throughput in gigabits per second (Gbps).
* Only applied when {@code shouldStream} is true.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like typo and should be {@code uploadBufferDisabled}

*
* When doing upload with streaming, it's important to set the disk throughput to prevent connection starvation.
* Note: There are possibilities that cannot reach all available disk throughput:
* 1. Disk is busy with other applications
* 2. OS Cache may cap the throughput, use {@code directIo} to get around this.
*
* @return disk throughput value in Gpbs.
*/
public Double diskThroughputGbps() {
return diskThroughputGbps;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

S3CrtFileIoConfiguration that = (S3CrtFileIoConfiguration) o;

if (!Objects.equals(uploadBufferDisabled, that.uploadBufferDisabled)) {
return false;
}
return Objects.equals(diskThroughputGbps, that.diskThroughputGbps);
}

@Override
public int hashCode() {
int result = uploadBufferDisabled != null ? uploadBufferDisabled.hashCode() : 0;
result = 31 * result + (diskThroughputGbps != null ? diskThroughputGbps.hashCode() : 0);
return result;
}

@Override
public Builder toBuilder() {
return new DefaultBuilder(this);
}

public interface Builder extends CopyableBuilder<Builder, S3CrtFileIoConfiguration> {
/**
* Skip buffering the part in memory before sending the request.
* If set, set the {@code diskThroughputGbps} to reasonably align with the available disk throughput.
* Otherwise, the transfer may fail with connection starvation.
* Defaults to false.
*
* @param shouldStream whether to stream the file
* @return The builder for method chaining.
*/
Builder uploadBufferDisabled(Boolean shouldStream);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Builder uploadBufferDisabled(Boolean shouldStream);
Builder uploadBufferDisabled(Boolean uploadBufferDisabled);


/**
* The estimated disk throughput in gigabits per second (Gbps).
* Only applied when {@code shouldStream} is true.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also mention in Java doc why shouldStream is referenced as uploadBufferDisabled in Java client?

*
* When doing upload with streaming, it's important to set the disk throughput to prevent connection starvation.
* Note: There are possibilities that cannot reach all available disk throughput:
* 1. Disk is busy with other applications
* 2. OS Cache may cap the throughput, use {@code directIo} to get around this.
*
* @param diskThroughputGbps the disk throughput in Gbps
* @return The builder for method chaining.
*/
Builder diskThroughputGbps(Double diskThroughputGbps);

@Override
S3CrtFileIoConfiguration build();
}

private static final class DefaultBuilder implements Builder {
private Boolean uploadBufferDisabled;
private Double diskThroughputGbps;

private DefaultBuilder() {
}

private DefaultBuilder(S3CrtFileIoConfiguration fileIoOptions) {
this.uploadBufferDisabled = fileIoOptions.uploadBufferDisabled;
this.diskThroughputGbps = fileIoOptions.diskThroughputGbps;
}

@Override
public Builder uploadBufferDisabled(Boolean shouldStream) {
this.uploadBufferDisabled = shouldStream;
return this;
}

@Override
public Builder diskThroughputGbps(Double diskThroughputGbps) {
this.diskThroughputGbps = diskThroughputGbps;
return this;
}

@Override
public S3CrtFileIoConfiguration build() {
return new S3CrtFileIoConfiguration(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import software.amazon.awssdk.annotations.SdkInternalApi;
Expand All @@ -47,6 +48,7 @@
import software.amazon.awssdk.core.checksums.RequestChecksumCalculation;
import software.amazon.awssdk.core.checksums.ResponseChecksumValidation;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.core.interceptor.Context;
import software.amazon.awssdk.core.interceptor.ExecutionAttribute;
Expand All @@ -67,6 +69,7 @@
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder;
import software.amazon.awssdk.services.s3.crt.S3CrtFileIoConfiguration;
import software.amazon.awssdk.services.s3.crt.S3CrtHttpConfiguration;
import software.amazon.awssdk.services.s3.crt.S3CrtRetryConfiguration;
import software.amazon.awssdk.services.s3.internal.checksums.ChecksumsEnabledValidator;
Expand All @@ -78,6 +81,7 @@
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.utils.AttributeMap;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.awssdk.utils.Validate;

Expand Down Expand Up @@ -222,7 +226,9 @@ private static S3CrtAsyncHttpClient.Builder initializeS3CrtAsyncHttpClient(Defau
.readBufferSizeInBytes(builder.readBufferSizeInBytes)
.httpConfiguration(builder.httpConfiguration)
.thresholdInBytes(builder.thresholdInBytes)
.maxNativeMemoryLimitInBytes(builder.maxNativeMemoryLimitInBytes);
.maxNativeMemoryLimitInBytes(builder.maxNativeMemoryLimitInBytes)
.fileIoConfiguration(builder.fileIoConfiguration)
.advancedOptions(builder.advancedOptions.build());

if (builder.retryConfiguration != null) {
nativeClientBuilder.standardRetryOptions(
Expand Down Expand Up @@ -256,7 +262,9 @@ public static final class DefaultS3CrtClientBuilder implements S3CrtAsyncClientB
private Long thresholdInBytes;
private Executor futureCompletionExecutor;
private Boolean disableS3ExpressSessionAuth;
private S3CrtFileIoConfiguration fileIoConfiguration;

private AttributeMap.Builder advancedOptions = AttributeMap.builder();

@Override
public DefaultS3CrtClientBuilder credentialsProvider(AwsCredentialsProvider credentialsProvider) {
Expand Down Expand Up @@ -388,6 +396,24 @@ public DefaultS3CrtClientBuilder disableS3ExpressSessionAuth(Boolean disableS3Ex
return this;
}

@Override
public S3CrtAsyncClientBuilder fileIoConfiguration(S3CrtFileIoConfiguration fileIoConfiguration) {
this.fileIoConfiguration = fileIoConfiguration;
return this;
}

@Override
public <T> S3CrtAsyncClientBuilder putAdvancedOption(SdkAdvancedAsyncClientOption<T> option, T value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have new option class for CRT like SdkAdvancedAsyncCrtClientOption , else all the options which are available on SdkAdvancedAsyncClientOption might appear as supported for CRT and vice versa.

advancedOptions.put(option, value);
return this;
}

@Override
public S3CrtAsyncClientBuilder advancedOptions(Map<SdkAdvancedAsyncClientOption<?>, ?> advancedOptions) {
this.advancedOptions.putAll(advancedOptions);
return this;
}

@Override
public S3CrtAsyncClient build() {
return new DefaultS3CrtAsyncClient(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ private S3ClientOptions createS3ClientOption() {
.withMemoryLimitInBytes(s3NativeClientConfiguration.maxNativeMemoryLimitInBytes())
.withThroughputTargetGbps(s3NativeClientConfiguration.targetThroughputInGbps())
.withInitialReadWindowSize(initialWindowSize)
.withReadBackpressureEnabled(true);
.withReadBackpressureEnabled(true)
.withFileIoOptions(s3NativeClientConfiguration.fileIoOptions());

if (s3NativeClientConfiguration.standardRetryOptions() != null) {
options.withStandardRetryOptions(s3NativeClientConfiguration.standardRetryOptions());
Expand Down
Loading