Skip to content

Commit 57cbc1f

Browse files
committed
Add more tests and address comments
1 parent 004733b commit 57cbc1f

File tree

21 files changed

+622
-183
lines changed

21 files changed

+622
-183
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BufferedSplittableAsyncRequestBody.java

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,38 @@
2020
import org.reactivestreams.Subscriber;
2121
import software.amazon.awssdk.annotations.SdkPublicApi;
2222
import software.amazon.awssdk.core.internal.async.SplittingPublisher;
23+
import software.amazon.awssdk.utils.Validate;
2324

2425
/**
25-
* An {@link AsyncRequestBody} decorator that can be split into buffered sub {@link AsyncRequestBody}s. Each sub
26-
* {@link AsyncRequestBody} can be retried/resubscribed if all data has been successfully been published to first subscriber.
26+
* An {@link AsyncRequestBody} decorator that enables splitting into retryable sub-request bodies.
27+
*
28+
* <p>This wrapper allows any {@link AsyncRequestBody} to be split into multiple parts where each part
29+
* can be retried independently. When split, each sub-body buffers its portion of data, enabling
30+
* resubscription if a retry is needed (e.g., due to network failures or service errors).</p>
31+
*
32+
* <p><b>Usage Example:</b></p>
33+
* <pre>{@code
34+
* AsyncRequestBody originalBody = AsyncRequestBody.fromString("Hello World");
35+
* BufferedSplittableAsyncRequestBody retryableBody =
36+
* BufferedSplittableAsyncRequestBody.create(originalBody);
37+
*
38+
* AsyncRequestBodySplitConfiguration config = AsyncRequestBodySplitConfiguration.builder()
39+
* .chunkSizeInBytes(1024)
40+
* .bufferSizeInBytes(2048)
41+
* .build();
42+
*
43+
* SdkPublisher<ClosableAsyncRequestBody> parts = retryableBody.splitClosable(config);
44+
* }</pre>
45+
*
46+
* <p><b>Performance Considerations:</b></p>
47+
* <p>This implementation buffers data in memory to enable retries, but memory usage is controlled by
48+
* the {@code bufferSizeInBytes} configuration. However, this buffering limits the ability to request
49+
* more data from the original AsyncRequestBody until buffered data is consumed (i.e., when subscribers
50+
* closes sub-body), which may increase latency compared to non-buffered implementations.
51+
*
52+
* @see AsyncRequestBody
53+
* @see AsyncRequestBodySplitConfiguration
54+
* @see ClosableAsyncRequestBody
2755
*/
2856
@SdkPublicApi
2957
public final class BufferedSplittableAsyncRequestBody implements AsyncRequestBody {
@@ -33,7 +61,15 @@ private BufferedSplittableAsyncRequestBody(AsyncRequestBody delegate) {
3361
this.delegate = delegate;
3462
}
3563

64+
/**
65+
* Creates a new {@link BufferedSplittableAsyncRequestBody} that wraps the provided {@link AsyncRequestBody}.
66+
*
67+
* @param delegate the {@link AsyncRequestBody} to wrap and make retryable. Must not be null.
68+
* @return a new {@link BufferedSplittableAsyncRequestBody} instance
69+
* @throws NullPointerException if delegate is null
70+
*/
3671
public static BufferedSplittableAsyncRequestBody create(AsyncRequestBody delegate) {
72+
Validate.paramNotNull(delegate, "delegate");
3773
return new BufferedSplittableAsyncRequestBody(delegate);
3874
}
3975

@@ -42,6 +78,22 @@ public Optional<Long> contentLength() {
4278
return delegate.contentLength();
4379
}
4480

81+
/**
82+
* Splits this request body into multiple retryable parts based on the provided configuration.
83+
*
84+
* <p>Each part returned by the publisher will be a {@link ClosableAsyncRequestBody} that buffers
85+
* its portion of data, enabling resubscription for retry scenarios. This is the key difference from non-buffered splitting -
86+
* each part can be safely retried without data loss.
87+
*
88+
* <p>The splitting process respects the chunk size and buffer size specified in the configuration
89+
* to optimize memory usage.
90+
*
91+
* <p>The subscriber MUST close each {@link ClosableAsyncRequestBody} to ensure resource is released
92+
*
93+
* @param splitConfiguration configuration specifying how to split the request body
94+
* @return a publisher that emits retryable closable request body parts
95+
* @see AsyncRequestBodySplitConfiguration
96+
*/
4597
@Override
4698
public SdkPublisher<ClosableAsyncRequestBody> splitClosable(AsyncRequestBodySplitConfiguration splitConfiguration) {
4799
return new SplittingPublisher(this, splitConfiguration, true);

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/ClosableAsyncRequestBody.java renamed to core/sdk-core/src/main/java/software/amazon/awssdk/core/async/CloseableAsyncRequestBody.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,5 @@
2222
* An extension of {@link AsyncRequestBody} that is closable.
2323
*/
2424
@SdkPublicApi
25-
public interface ClosableAsyncRequestBody extends AsyncRequestBody, SdkAutoCloseable {
25+
public interface CloseableAsyncRequestBody extends AsyncRequestBody, SdkAutoCloseable {
2626
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/listener/AsyncRequestBodyListener.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import software.amazon.awssdk.annotations.SdkProtectedApi;
2424
import software.amazon.awssdk.core.async.AsyncRequestBody;
2525
import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration;
26-
import software.amazon.awssdk.core.async.ClosableAsyncRequestBody;
26+
import software.amazon.awssdk.core.async.CloseableAsyncRequestBody;
2727
import software.amazon.awssdk.core.async.SdkPublisher;
2828
import software.amazon.awssdk.utils.Logger;
2929
import software.amazon.awssdk.utils.Validate;
@@ -78,14 +78,14 @@ public SdkPublisher<AsyncRequestBody> split(Consumer<AsyncRequestBodySplitConfig
7878
}
7979

8080
@Override
81-
public SdkPublisher<ClosableAsyncRequestBody> splitClosable(AsyncRequestBodySplitConfiguration splitConfiguration) {
82-
return delegate.splitClosable(splitConfiguration);
81+
public SdkPublisher<CloseableAsyncRequestBody> splitCloseable(AsyncRequestBodySplitConfiguration splitConfiguration) {
82+
return delegate.splitCloseable(splitConfiguration);
8383
}
8484

8585
@Override
86-
public SdkPublisher<ClosableAsyncRequestBody> splitClosable(
86+
public SdkPublisher<CloseableAsyncRequestBody> splitCloseable(
8787
Consumer<AsyncRequestBodySplitConfiguration.Builder> splitConfiguration) {
88-
return delegate.splitClosable(splitConfiguration);
88+
return delegate.splitCloseable(splitConfiguration);
8989
}
9090

9191
@Override

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
import software.amazon.awssdk.annotations.SdkInternalApi;
3535
import software.amazon.awssdk.core.async.AsyncRequestBody;
3636
import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration;
37-
import software.amazon.awssdk.core.async.ClosableAsyncRequestBody;
37+
import software.amazon.awssdk.core.async.CloseableAsyncRequestBody;
3838
import software.amazon.awssdk.core.async.SdkPublisher;
3939
import software.amazon.awssdk.core.internal.util.Mimetype;
4040
import software.amazon.awssdk.core.internal.util.NoopSubscription;
@@ -88,7 +88,7 @@ public SdkPublisher<AsyncRequestBody> split(AsyncRequestBodySplitConfiguration s
8888
}
8989

9090
@Override
91-
public SdkPublisher<ClosableAsyncRequestBody> splitClosable(AsyncRequestBodySplitConfiguration splitConfiguration) {
91+
public SdkPublisher<CloseableAsyncRequestBody> splitCloseable(AsyncRequestBodySplitConfiguration splitConfiguration) {
9292
return split(splitConfiguration).map(body -> new ClosableAsyncRequestBodyWrapper(body));
9393
}
9494

@@ -443,7 +443,7 @@ private static AsynchronousFileChannel openInputChannel(Path path) throws IOExce
443443
return AsynchronousFileChannel.open(path, StandardOpenOption.READ);
444444
}
445445

446-
private static class ClosableAsyncRequestBodyWrapper implements ClosableAsyncRequestBody {
446+
private static class ClosableAsyncRequestBodyWrapper implements CloseableAsyncRequestBody {
447447
private final AsyncRequestBody delegate;
448448

449449
ClosableAsyncRequestBodyWrapper(AsyncRequestBody body) {

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/NonRetryableSubAsyncRequestBody.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,11 @@ public Optional<Long> contentLength() {
6262

6363
public void send(ByteBuffer data) {
6464
log.debug(() -> String.format("Sending bytebuffer %s to part %d", data, partNumber));
65-
int length = data.remaining();
65+
long length = data.remaining();
6666
bufferedLength += length;
67-
onNumBytesReceived.accept((long) length);
67+
onNumBytesReceived.accept(length);
6868
delegate.send(data).whenComplete((r, t) -> {
69-
onNumBytesConsumed.accept((long) length);
69+
onNumBytesConsumed.accept(length);
7070
if (t != null) {
7171
error(t);
7272
}
@@ -92,11 +92,6 @@ public long receivedBytesLength() {
9292
return bufferedLength;
9393
}
9494

95-
@Override
96-
public boolean contentLengthKnown() {
97-
return contentLengthKnown;
98-
}
99-
10095
@Override
10196
public int partNumber() {
10297
return partNumber;
@@ -113,8 +108,10 @@ public void subscribe(Subscriber<? super ByteBuffer> s) {
113108
} else {
114109
s.onSubscribe(new NoopSubscription(s));
115110
s.onError(NonRetryableException.create(
116-
"A retry was attempted, but the provided source AsyncRequestBody does not "
117-
+ "support splitting to retryable AsyncRequestBody. Consider using BufferedSplittableAsyncRequestBody."));
111+
"Multiple subscriptions detected. This could happen due to a retry attempt. The AsyncRequestBody implementation"
112+
+ " provided does not support splitting to retryable/resubscribable AsyncRequestBody. If you need retry "
113+
+ "capability or multiple subscriptions, consider using BufferedSplittableAsyncRequestBody to wrap your "
114+
+ "AsyncRequestBody."));
118115
}
119116
}
120117

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/RetryableSubAsyncRequestBody.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,10 @@ public Optional<Long> contentLength() {
7575
@Override
7676
public void send(ByteBuffer data) {
7777
log.trace(() -> String.format("Sending bytebuffer %s to part number %d", data, partNumber));
78-
int length = data.remaining();
78+
long length = data.remaining();
7979
bufferedLength += length;
8080

81-
onNumBytesReceived.accept((long) length);
81+
onNumBytesReceived.accept(length);
8282
delegate.send(data.asReadOnlyBuffer()).whenComplete((r, t) -> {
8383
if (t != null) {
8484
delegate.error(t);
@@ -94,7 +94,9 @@ public void complete() {
9494
log.debug(() -> "Received complete() for part number: " + partNumber);
9595
// ByteBuffersAsyncRequestBody MUST be created before we complete the current
9696
// request because retry may happen right after
97-
bufferedAsyncRequestBody = ByteBuffersAsyncRequestBody.of(buffers, bufferedLength);
97+
synchronized (buffersLock) {
98+
bufferedAsyncRequestBody = ByteBuffersAsyncRequestBody.of(buffers, bufferedLength);
99+
}
98100
delegate.complete().exceptionally(e -> {
99101
delegate.error(e);
100102
return null;
@@ -121,7 +123,7 @@ public void subscribe(Subscriber<? super ByteBuffer> s) {
121123
if (bufferedAsyncRequestBody == null) {
122124
s.onSubscribe(new NoopSubscription(s));
123125
s.onError(NonRetryableException.create(
124-
"A retry was attempted, but data is not buffered successfully for retry, partNumber " + partNumber));
126+
"A retry was attempted, but data is not buffered successfully for retry for partNumber: " + partNumber));
125127
return;
126128
}
127129
bufferedAsyncRequestBody.subscribe(s);
@@ -139,19 +141,14 @@ public void close() {
139141
buffers = null;
140142
}
141143
bufferedAsyncRequestBody.close();
142-
log.debug(() -> "requesting data after closing" + partNumber);
144+
bufferedAsyncRequestBody = null;
143145
}
144146
} catch (Throwable e) {
145147
log.warn(() -> String.format("Unexpected error thrown from cleaning up AsyncRequestBody for part number %d, "
146148
+ "resource may be leaked", partNumber));
147149
}
148150
}
149151

150-
@Override
151-
public boolean contentLengthKnown() {
152-
return contentLengthKnown;
153-
}
154-
155152
@Override
156153
public int partNumber() {
157154
return partNumber;

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java

Lines changed: 67 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import software.amazon.awssdk.annotations.SdkInternalApi;
2525
import software.amazon.awssdk.core.async.AsyncRequestBody;
2626
import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration;
27-
import software.amazon.awssdk.core.async.ClosableAsyncRequestBody;
27+
import software.amazon.awssdk.core.async.CloseableAsyncRequestBody;
2828
import software.amazon.awssdk.core.async.SdkPublisher;
2929
import software.amazon.awssdk.utils.Logger;
3030
import software.amazon.awssdk.utils.Validate;
@@ -38,43 +38,49 @@
3838
* Otherwise, it is sent after the entire content for that chunk is buffered. This is required to get content length.
3939
*/
4040
@SdkInternalApi
41-
public class SplittingPublisher implements SdkPublisher<ClosableAsyncRequestBody> {
41+
public class SplittingPublisher implements SdkPublisher<CloseableAsyncRequestBody> {
4242
private static final Logger log = Logger.loggerFor(SplittingPublisher.class);
4343
private final AsyncRequestBody upstreamPublisher;
4444
private final SplittingSubscriber splittingSubscriber;
45-
private final SimplePublisher<ClosableAsyncRequestBody> downstreamPublisher = new SimplePublisher<>();
45+
private final SimplePublisher<CloseableAsyncRequestBody> downstreamPublisher = new SimplePublisher<>();
4646
private final long chunkSizeInBytes;
4747
private final long bufferSizeInBytes;
48-
private final boolean enableRetryableSubAsyncRequestBody;
48+
private final boolean retryableSubAsyncRequestBodyEnabled;
4949
private final AtomicBoolean currentBodySent = new AtomicBoolean(false);
5050
private final String sourceBodyName;
5151

52-
public SplittingPublisher(AsyncRequestBody asyncRequestBody,
53-
AsyncRequestBodySplitConfiguration splitConfiguration,
54-
boolean enableRetryableSubAsyncRequestBody) {
55-
this.upstreamPublisher = Validate.paramNotNull(asyncRequestBody, "asyncRequestBody");
56-
Validate.notNull(splitConfiguration, "splitConfiguration");
57-
this.chunkSizeInBytes = splitConfiguration.chunkSizeInBytes() == null ?
52+
private SplittingPublisher(Builder builder) {
53+
this.upstreamPublisher = Validate.paramNotNull(builder.asyncRequestBody, "asyncRequestBody");
54+
Validate.notNull(builder.splitConfiguration, "splitConfiguration");
55+
this.chunkSizeInBytes = builder.splitConfiguration.chunkSizeInBytes() == null ?
5856
AsyncRequestBodySplitConfiguration.defaultConfiguration().chunkSizeInBytes() :
59-
splitConfiguration.chunkSizeInBytes();
57+
builder.splitConfiguration.chunkSizeInBytes();
6058

61-
this.bufferSizeInBytes = splitConfiguration.bufferSizeInBytes() == null ?
59+
this.bufferSizeInBytes = builder.splitConfiguration.bufferSizeInBytes() == null ?
6260
AsyncRequestBodySplitConfiguration.defaultConfiguration().bufferSizeInBytes() :
63-
splitConfiguration.bufferSizeInBytes();
61+
builder.splitConfiguration.bufferSizeInBytes();
6462

6563
this.splittingSubscriber = new SplittingSubscriber(upstreamPublisher.contentLength().orElse(null));
6664

67-
this.enableRetryableSubAsyncRequestBody = enableRetryableSubAsyncRequestBody;
68-
this.sourceBodyName = asyncRequestBody.body();
65+
this.retryableSubAsyncRequestBodyEnabled = Validate.paramNotNull(builder.retryableSubAsyncRequestBodyEnabled,
66+
"retryableSubAsyncRequestBodyEnabled");
67+
this.sourceBodyName = builder.asyncRequestBody.body();
6968
if (!upstreamPublisher.contentLength().isPresent()) {
7069
Validate.isTrue(bufferSizeInBytes >= chunkSizeInBytes,
7170
"bufferSizeInBytes must be larger than or equal to " +
7271
"chunkSizeInBytes if the content length is unknown");
7372
}
7473
}
7574

75+
/**
76+
* Returns a newly initialized builder object for a {@link SplittingPublisher}
77+
*/
78+
public static Builder builder() {
79+
return new Builder();
80+
}
81+
7682
@Override
77-
public void subscribe(Subscriber<? super ClosableAsyncRequestBody> downstreamSubscriber) {
83+
public void subscribe(Subscriber<? super CloseableAsyncRequestBody> downstreamSubscriber) {
7884
downstreamPublisher.subscribe(downstreamSubscriber);
7985
upstreamPublisher.subscribe(splittingSubscriber);
8086
}
@@ -123,7 +129,7 @@ private SubAsyncRequestBody initializeNextDownstreamBody(boolean contentLengthKn
123129
.sourceBodyName(sourceBodyName)
124130
.build();
125131

126-
if (enableRetryableSubAsyncRequestBody) {
132+
if (retryableSubAsyncRequestBodyEnabled) {
127133
body = new RetryableSubAsyncRequestBody(config);
128134
} else {
129135
body = new NonRetryableSubAsyncRequestBody(config);
@@ -206,15 +212,16 @@ private int amountRemainingInChunk() {
206212

207213
private void completeCurrentBody() {
208214
log.debug(() -> "completeCurrentBody for part " + currentBody.partNumber());
209-
// For unknown content length, we always create a new DownstreamBody because we don't know if there is data
210-
// left or not, so we need to only send the body if there is actually data
211215
long bufferedLength = currentBody.receivedBytesLength();
216+
// For unknown content length, we always create a new DownstreamBody once the current one is sent
217+
// because we don't know if there is data
218+
// left or not, so we need to check the length and only send the body if there is actually data
212219
if (bufferedLength == 0) {
213220
return;
214221
}
215222

216223
Long totalLength = currentBody.maxLength();
217-
if (currentBody.contentLengthKnown() && totalLength != bufferedLength) {
224+
if (upstreamSize != null && totalLength != bufferedLength) {
218225
upstreamSubscription.cancel();
219226
downstreamPublisher.error(new IllegalStateException(
220227
String.format("Content length of buffered data mismatches "
@@ -294,4 +301,44 @@ private void addDataBuffered(long length) {
294301
}
295302
}
296303
}
304+
305+
public static final class Builder {
306+
private AsyncRequestBody asyncRequestBody;
307+
private AsyncRequestBodySplitConfiguration splitConfiguration;
308+
private Boolean retryableSubAsyncRequestBodyEnabled;
309+
310+
private Builder() {
311+
}
312+
313+
/**
314+
* Sets the AsyncRequestBody to be split.
315+
*/
316+
public Builder asyncRequestBody(AsyncRequestBody asyncRequestBody) {
317+
this.asyncRequestBody = asyncRequestBody;
318+
return this;
319+
}
320+
321+
/**
322+
* Sets the split configuration.
323+
*/
324+
public Builder splitConfiguration(AsyncRequestBodySplitConfiguration splitConfiguration) {
325+
this.splitConfiguration = splitConfiguration;
326+
return this;
327+
}
328+
329+
/**
330+
* Sets whether to enable retryable sub async request bodies.
331+
*/
332+
public Builder retryableSubAsyncRequestBodyEnabled(Boolean retryableSubAsyncRequestBodyEnabled) {
333+
this.retryableSubAsyncRequestBodyEnabled = retryableSubAsyncRequestBodyEnabled;
334+
return this;
335+
}
336+
337+
/**
338+
* Builds a {@link SplittingPublisher} object based on the values held by this builder.
339+
*/
340+
public SplittingPublisher build() {
341+
return new SplittingPublisher(this);
342+
}
343+
}
297344
}

0 commit comments

Comments
 (0)