Skip to content

Commit c82d494

Browse files
authored
Stream retry support part 2: Introduce a new split method in AsyncRequestBody that returns an SdkP… (#6346)
* Introduce a new split method in AsyncRequestBody that returns an SdkPublisher of ClosableAsyncRequestBody and use it in s3 multipart client * Fix build and add more tests * Refactoring based on API surface are review * Add more tests and address comments * Deprecate split, update documentation and add changelog entries
1 parent 40cedd3 commit c82d494

File tree

33 files changed

+1656
-382
lines changed

33 files changed

+1656
-382
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "deprecation",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "",
5+
"description": "Deprecate `AsyncRequestBody#split` in favor of `AsyncRequestBody#splitCloseable` that takes the same input but returns `SdkPublisher<CloseableAsyncRequestBody>`"
6+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "",
5+
"description": "Add `AsyncRequestBody#splitCloseable` API that returns a Publisher of `ClosableAsyncRequestBody`"
6+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "",
5+
"description": "Introduce CloseableAsyncRequestBody interface that extends both AsyncRequestBody and SdkAutoClosable interfaces"
6+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "",
5+
"description": "Introduce BufferedSplittableAsyncRequestBody that enables splitting into retryable sub-request bodies."
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -500,7 +500,6 @@ static AsyncRequestBody empty() {
500500
return fromBytes(new byte[0]);
501501
}
502502

503-
504503
/**
505504
* Converts this {@link AsyncRequestBody} to a publisher of {@link AsyncRequestBody}s, each of which publishes a specific
506505
* portion of the original data, based on the provided {@link AsyncRequestBodySplitConfiguration}. The default chunk size
@@ -513,25 +512,75 @@ static AsyncRequestBody empty() {
513512
* than or equal to {@code chunkSizeInBytes}. Note that this behavior may be different if a specific implementation of this
514513
* interface overrides this method.
515514
*
516-
* @see AsyncRequestBodySplitConfiguration
515+
* @deprecated use {@link #splitCloseable(AsyncRequestBodySplitConfiguration)} instead.
517516
*/
517+
@Deprecated
518518
default SdkPublisher<AsyncRequestBody> split(AsyncRequestBodySplitConfiguration splitConfiguration) {
519519
Validate.notNull(splitConfiguration, "splitConfiguration");
520+
return SplittingPublisher.builder()
521+
.asyncRequestBody(this)
522+
.splitConfiguration(splitConfiguration)
523+
.retryableSubAsyncRequestBodyEnabled(false)
524+
.build()
525+
.map(r -> r);
526+
}
520527

521-
return new SplittingPublisher(this, splitConfiguration);
528+
/**
529+
* Converts this {@link AsyncRequestBody} to a publisher of {@link CloseableAsyncRequestBody}s, each of which publishes
530+
* specific portion of the original data, based on the provided {@link AsyncRequestBodySplitConfiguration}. The default chunk
531+
* size is 2MB and the default buffer size is 8MB.
532+
*
533+
* <p>
534+
* The default implementation behaves the same as {@link #split(AsyncRequestBodySplitConfiguration)}. This behavior may
535+
* vary in different implementations.
536+
*
537+
* <p>
538+
* Caller is responsible for closing {@link CloseableAsyncRequestBody} when it is ready to be disposed to release any
539+
* resources.
540+
*
541+
* <p><b>Note:</b> This method is primarily intended for use by AWS SDK high-level libraries and internal components.
542+
* SDK customers should typically use higher-level APIs provided by service clients rather than calling this method directly.
543+
*
544+
* @see #splitCloseable(Consumer)
545+
* @see AsyncRequestBodySplitConfiguration
546+
*/
547+
default SdkPublisher<CloseableAsyncRequestBody> splitCloseable(AsyncRequestBodySplitConfiguration splitConfiguration) {
548+
Validate.notNull(splitConfiguration, "splitConfiguration");
549+
return SplittingPublisher.builder()
550+
.asyncRequestBody(this)
551+
.splitConfiguration(splitConfiguration)
552+
.retryableSubAsyncRequestBodyEnabled(false)
553+
.build();
522554
}
523555

524556
/**
525557
* This is a convenience method that passes an instance of the {@link AsyncRequestBodySplitConfiguration} builder,
526558
* avoiding the need to create one manually via {@link AsyncRequestBodySplitConfiguration#builder()}.
527559
*
528560
* @see #split(AsyncRequestBodySplitConfiguration)
561+
* @deprecated use {@link #splitCloseable(Consumer)} instead
529562
*/
563+
@Deprecated
530564
default SdkPublisher<AsyncRequestBody> split(Consumer<AsyncRequestBodySplitConfiguration.Builder> splitConfiguration) {
531565
Validate.notNull(splitConfiguration, "splitConfiguration");
532566
return split(AsyncRequestBodySplitConfiguration.builder().applyMutation(splitConfiguration).build());
533567
}
534568

569+
/**
570+
* This is a convenience method that passes an instance of the {@link AsyncRequestBodySplitConfiguration} builder,
571+
* avoiding the need to create one manually via {@link AsyncRequestBodySplitConfiguration#builder()}.
572+
*
573+
* <p><b>Note:</b> This method is primarily intended for use by AWS SDK high-level libraries and internal components.
574+
* SDK customers should typically use higher-level APIs provided by service clients rather than calling this method directly.
575+
*
576+
* @see #splitCloseable(AsyncRequestBodySplitConfiguration)
577+
*/
578+
default SdkPublisher<CloseableAsyncRequestBody> splitCloseable(
579+
Consumer<AsyncRequestBodySplitConfiguration.Builder> splitConfiguration) {
580+
Validate.notNull(splitConfiguration, "splitConfiguration");
581+
return splitCloseable(AsyncRequestBodySplitConfiguration.builder().applyMutation(splitConfiguration).build());
582+
}
583+
535584
@SdkProtectedApi
536585
enum BodyType {
537586
FILE("File", "f"),
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.async;
17+
18+
import java.nio.ByteBuffer;
19+
import java.util.Optional;
20+
import org.reactivestreams.Subscriber;
21+
import software.amazon.awssdk.annotations.SdkPublicApi;
22+
import software.amazon.awssdk.core.internal.async.SplittingPublisher;
23+
import software.amazon.awssdk.utils.Validate;
24+
25+
/**
26+
* An {@link AsyncRequestBody} decorator that enables splitting into retryable sub-request bodies.
27+
*
28+
* <p>This wrapper allows any {@link AsyncRequestBody} to be split into multiple parts where each part
29+
* can be retried independently. When split, each sub-body buffers its portion of data, enabling
30+
* resubscription if a retry is needed (e.g., due to network failures or service errors).</p>
31+
*
32+
* <p><b>Retry Requirements:</b></p>
33+
* <p>Retry is only possible if all the data has been successfully buffered during the first subscription.
34+
* If the first subscriber fails to consume all the data (e.g., due to early cancellation or errors),
35+
* subsequent retry attempts will fail since the complete data set is not available for resubscription.</p>
36+
*
37+
* <p><b>Usage Example:</b></p>
38+
* {@snippet :
39+
* AsyncRequestBody originalBody = AsyncRequestBody.fromString("Hello World");
40+
* BufferedSplittableAsyncRequestBody retryableBody =
41+
* BufferedSplittableAsyncRequestBody.create(originalBody);
42+
* }
43+
*
44+
* <p><b>Performance Considerations:</b></p>
45+
* <p>This implementation buffers data in memory to enable retries, but memory usage is controlled by
46+
* the {@code bufferSizeInBytes} configuration. However, this buffering limits the ability to request
47+
* more data from the original AsyncRequestBody until buffered data is consumed (i.e., when subscribers
48+
* closes sub-body), which may increase latency compared to non-buffered implementations.
49+
*
50+
* @see AsyncRequestBody
51+
* @see AsyncRequestBodySplitConfiguration
52+
* @see CloseableAsyncRequestBody
53+
*/
54+
@SdkPublicApi
55+
public final class BufferedSplittableAsyncRequestBody implements AsyncRequestBody {
56+
private final AsyncRequestBody delegate;
57+
58+
private BufferedSplittableAsyncRequestBody(AsyncRequestBody delegate) {
59+
this.delegate = delegate;
60+
}
61+
62+
/**
63+
* Creates a new {@link BufferedSplittableAsyncRequestBody} that wraps the provided {@link AsyncRequestBody}.
64+
*
65+
* @param delegate the {@link AsyncRequestBody} to wrap and make retryable. Must not be null.
66+
* @return a new {@link BufferedSplittableAsyncRequestBody} instance
67+
* @throws NullPointerException if delegate is null
68+
*/
69+
public static BufferedSplittableAsyncRequestBody create(AsyncRequestBody delegate) {
70+
Validate.paramNotNull(delegate, "delegate");
71+
return new BufferedSplittableAsyncRequestBody(delegate);
72+
}
73+
74+
@Override
75+
public Optional<Long> contentLength() {
76+
return delegate.contentLength();
77+
}
78+
79+
/**
80+
* Splits this request body into multiple retryable parts based on the provided configuration.
81+
*
82+
* <p>Each part returned by the publisher will be a {@link CloseableAsyncRequestBody} that buffers
83+
* its portion of data, enabling resubscription for retry scenarios. This is the key difference from non-buffered splitting -
84+
* each part can be safely retried without data loss.
85+
*
86+
* <p>The splitting process respects the chunk size and buffer size specified in the configuration
87+
* to optimize memory usage.
88+
*
89+
* <p>The subscriber MUST close each {@link CloseableAsyncRequestBody} to ensure resource is released
90+
*
91+
* @param splitConfiguration configuration specifying how to split the request body
92+
* @return a publisher that emits retryable closable request body parts
93+
* @see AsyncRequestBodySplitConfiguration
94+
*/
95+
@Override
96+
public SdkPublisher<CloseableAsyncRequestBody> splitCloseable(AsyncRequestBodySplitConfiguration splitConfiguration) {
97+
return SplittingPublisher.builder()
98+
.asyncRequestBody(this)
99+
.splitConfiguration(splitConfiguration)
100+
.retryableSubAsyncRequestBodyEnabled(true)
101+
.build();
102+
}
103+
104+
@Override
105+
public void subscribe(Subscriber<? super ByteBuffer> s) {
106+
delegate.subscribe(s);
107+
}
108+
109+
@Override
110+
public String body() {
111+
return delegate.body();
112+
}
113+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.async;
17+
18+
import software.amazon.awssdk.annotations.SdkPublicApi;
19+
import software.amazon.awssdk.utils.SdkAutoCloseable;
20+
21+
/**
22+
* An extension of {@link AsyncRequestBody} that is closable.
23+
*/
24+
@SdkPublicApi
25+
public interface CloseableAsyncRequestBody extends AsyncRequestBody, SdkAutoCloseable {
26+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/listener/AsyncRequestBodyListener.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import software.amazon.awssdk.annotations.SdkProtectedApi;
2424
import software.amazon.awssdk.core.async.AsyncRequestBody;
2525
import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration;
26+
import software.amazon.awssdk.core.async.CloseableAsyncRequestBody;
2627
import software.amazon.awssdk.core.async.SdkPublisher;
2728
import software.amazon.awssdk.utils.Logger;
2829
import software.amazon.awssdk.utils.Validate;
@@ -76,6 +77,17 @@ public SdkPublisher<AsyncRequestBody> split(Consumer<AsyncRequestBodySplitConfig
7677
return delegate.split(splitConfiguration);
7778
}
7879

80+
@Override
81+
public SdkPublisher<CloseableAsyncRequestBody> splitCloseable(AsyncRequestBodySplitConfiguration splitConfiguration) {
82+
return delegate.splitCloseable(splitConfiguration);
83+
}
84+
85+
@Override
86+
public SdkPublisher<CloseableAsyncRequestBody> splitCloseable(
87+
Consumer<AsyncRequestBodySplitConfiguration.Builder> splitConfiguration) {
88+
return delegate.splitCloseable(splitConfiguration);
89+
}
90+
7991
@Override
8092
public void subscribe(Subscriber<? super ByteBuffer> s) {
8193
invoke(() -> listener.publisherSubscribe(s), "publisherSubscribe");

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBody.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,9 @@ public final class ByteBuffersAsyncRequestBody implements AsyncRequestBody, SdkA
7676
private final Object lock = new Object();
7777
private boolean closed;
7878

79-
private ByteBuffersAsyncRequestBody(String mimetype, Long length, List<ByteBuffer> buffers) {
79+
private ByteBuffersAsyncRequestBody(String mimetype,
80+
Long length,
81+
List<ByteBuffer> buffers) {
8082
this.mimetype = mimetype;
8183
this.buffers = buffers;
8284
this.length = length;
@@ -121,6 +123,10 @@ public String body() {
121123
return BodyType.BYTES.getName();
122124
}
123125

126+
public static ByteBuffersAsyncRequestBody of(List<ByteBuffer> buffers, long length) {
127+
return new ByteBuffersAsyncRequestBody(Mimetype.MIMETYPE_OCTET_STREAM, length, buffers);
128+
}
129+
124130
public static ByteBuffersAsyncRequestBody of(List<ByteBuffer> buffers) {
125131
long length = buffers.stream()
126132
.mapToLong(ByteBuffer::remaining)
@@ -129,7 +135,11 @@ public static ByteBuffersAsyncRequestBody of(List<ByteBuffer> buffers) {
129135
}
130136

131137
public static ByteBuffersAsyncRequestBody of(ByteBuffer... buffers) {
132-
return of(Arrays.asList(buffers));
138+
List<ByteBuffer> byteBuffers = Arrays.asList(buffers);
139+
long length = byteBuffers.stream()
140+
.mapToLong(ByteBuffer::remaining)
141+
.sum();
142+
return of(byteBuffers, length);
133143
}
134144

135145
public static ByteBuffersAsyncRequestBody of(Long length, ByteBuffer... buffers) {

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import software.amazon.awssdk.annotations.SdkInternalApi;
3535
import software.amazon.awssdk.core.async.AsyncRequestBody;
3636
import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration;
37+
import software.amazon.awssdk.core.async.CloseableAsyncRequestBody;
3738
import software.amazon.awssdk.core.async.SdkPublisher;
3839
import software.amazon.awssdk.core.internal.util.Mimetype;
3940
import software.amazon.awssdk.core.internal.util.NoopSubscription;
@@ -86,6 +87,11 @@ public SdkPublisher<AsyncRequestBody> split(AsyncRequestBodySplitConfiguration s
8687
return new FileAsyncRequestBodySplitHelper(this, splitConfiguration).split();
8788
}
8889

90+
@Override
91+
public SdkPublisher<CloseableAsyncRequestBody> splitCloseable(AsyncRequestBodySplitConfiguration splitConfiguration) {
92+
return split(splitConfiguration).map(body -> new ClosableAsyncRequestBodyWrapper(body));
93+
}
94+
8995
public Path path() {
9096
return path;
9197
}
@@ -436,4 +442,32 @@ private void signalOnError(Throwable t) {
436442
private static AsynchronousFileChannel openInputChannel(Path path) throws IOException {
437443
return AsynchronousFileChannel.open(path, StandardOpenOption.READ);
438444
}
445+
446+
private static class ClosableAsyncRequestBodyWrapper implements CloseableAsyncRequestBody {
447+
private final AsyncRequestBody delegate;
448+
449+
ClosableAsyncRequestBodyWrapper(AsyncRequestBody body) {
450+
this.delegate = body;
451+
}
452+
453+
@Override
454+
public Optional<Long> contentLength() {
455+
return delegate.contentLength();
456+
}
457+
458+
@Override
459+
public void subscribe(Subscriber<? super ByteBuffer> s) {
460+
delegate.subscribe(s);
461+
}
462+
463+
@Override
464+
public void close() {
465+
// no op
466+
}
467+
468+
@Override
469+
public String body() {
470+
return delegate.body();
471+
}
472+
}
439473
}

0 commit comments

Comments
 (0)