Skip to content

Commit 004733b

Browse files
committed
Refactoring based on API surface are review
1 parent 3c91cc9 commit 004733b

20 files changed

+828
-416
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import software.amazon.awssdk.annotations.SdkPublicApi;
3434
import software.amazon.awssdk.core.FileRequestBodyConfiguration;
3535
import software.amazon.awssdk.core.internal.async.ByteBuffersAsyncRequestBody;
36-
import software.amazon.awssdk.core.internal.async.ClosableAsyncRequestBodyAdaptor;
3736
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
3837
import software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody;
3938
import software.amazon.awssdk.core.internal.async.SplittingPublisher;
@@ -501,22 +500,23 @@ static AsyncRequestBody empty() {
501500
return fromBytes(new byte[0]);
502501
}
503502

504-
505503
/**
506504
* Converts this {@link AsyncRequestBody} to a publisher of {@link AsyncRequestBody}s, each of which publishes a specific
507505
* portion of the original data, based on the provided {@link AsyncRequestBodySplitConfiguration}. The default chunk size
508506
* is 2MB and the default buffer size is 8MB.
509507
*
510508
* <p>
511-
* Each divided {@link AsyncRequestBody} is sent after the entire content for that chunk is buffered.
509+
* By default, if content length of this {@link AsyncRequestBody} is present, each divided {@link AsyncRequestBody} is
510+
* delivered to the subscriber right after it's initialized. On the other hand, if content length is null, it is sent after
511+
* the entire content for that chunk is buffered. In this case, the configured {@code maxMemoryUsageInBytes} must be larger
512+
* than or equal to {@code chunkSizeInBytes}. Note that this behavior may be different if a specific implementation of this
513+
* interface overrides this method.
512514
*
513515
* @see AsyncRequestBodySplitConfiguration
514-
* @deprecated Use {@link #splitV2(AsyncRequestBodySplitConfiguration)} instead.
515516
*/
516-
@Deprecated
517517
default SdkPublisher<AsyncRequestBody> split(AsyncRequestBodySplitConfiguration splitConfiguration) {
518518
Validate.notNull(splitConfiguration, "splitConfiguration");
519-
return splitV2(splitConfiguration).map(body -> new ClosableAsyncRequestBodyAdaptor(body));
519+
return new SplittingPublisher(this, splitConfiguration, false).map(r -> r);
520520
}
521521

522522
/**
@@ -525,27 +525,26 @@ default SdkPublisher<AsyncRequestBody> split(AsyncRequestBodySplitConfiguration
525525
* size is 2MB and the default buffer size is 8MB.
526526
*
527527
* <p>
528-
* Each divided {@link ClosableAsyncRequestBody} is sent after the entire content for that chunk is buffered. This behavior
529-
* may be different if a specific implementation of this interface overrides this method.
528+
* The default implementation behaves the same as {@link #split(AsyncRequestBodySplitConfiguration)}. This behavior may
529+
* vary in different implementations.
530530
*
531531
* <p>
532-
* Each {@link ClosableAsyncRequestBody} MUST be closed by the user when it is ready to be disposed.
532+
* Caller is responsible for closing {@link ClosableAsyncRequestBody} when it is ready to be disposed to release any
533+
* resources.
533534
*
534535
* @see AsyncRequestBodySplitConfiguration
535536
*/
536-
default SdkPublisher<ClosableAsyncRequestBody> splitV2(AsyncRequestBodySplitConfiguration splitConfiguration) {
537+
default SdkPublisher<ClosableAsyncRequestBody> splitClosable(AsyncRequestBodySplitConfiguration splitConfiguration) {
537538
Validate.notNull(splitConfiguration, "splitConfiguration");
538-
return new SplittingPublisher(this, splitConfiguration);
539+
return new SplittingPublisher(this, splitConfiguration, false);
539540
}
540541

541542
/**
542543
* This is a convenience method that passes an instance of the {@link AsyncRequestBodySplitConfiguration} builder,
543544
* avoiding the need to create one manually via {@link AsyncRequestBodySplitConfiguration#builder()}.
544545
*
545546
* @see #split(AsyncRequestBodySplitConfiguration)
546-
* @deprecated Use {@link #splitV2(Consumer)} instead.
547547
*/
548-
@Deprecated
549548
default SdkPublisher<AsyncRequestBody> split(Consumer<AsyncRequestBodySplitConfiguration.Builder> splitConfiguration) {
550549
Validate.notNull(splitConfiguration, "splitConfiguration");
551550
return split(AsyncRequestBodySplitConfiguration.builder().applyMutation(splitConfiguration).build());
@@ -555,12 +554,12 @@ default SdkPublisher<AsyncRequestBody> split(Consumer<AsyncRequestBodySplitConfi
555554
* This is a convenience method that passes an instance of the {@link AsyncRequestBodySplitConfiguration} builder,
556555
* avoiding the need to create one manually via {@link AsyncRequestBodySplitConfiguration#builder()}.
557556
*
558-
* @see #splitV2(Consumer)
557+
* @see #splitClosable(Consumer)
559558
*/
560-
default SdkPublisher<ClosableAsyncRequestBody> splitV2(
559+
default SdkPublisher<ClosableAsyncRequestBody> splitClosable(
561560
Consumer<AsyncRequestBodySplitConfiguration.Builder> splitConfiguration) {
562561
Validate.notNull(splitConfiguration, "splitConfiguration");
563-
return splitV2(AsyncRequestBodySplitConfiguration.builder().applyMutation(splitConfiguration).build());
562+
return splitClosable(AsyncRequestBodySplitConfiguration.builder().applyMutation(splitConfiguration).build());
564563
}
565564

566565
@SdkProtectedApi
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.async;
17+
18+
import java.nio.ByteBuffer;
19+
import java.util.Optional;
20+
import org.reactivestreams.Subscriber;
21+
import software.amazon.awssdk.annotations.SdkPublicApi;
22+
import software.amazon.awssdk.core.internal.async.SplittingPublisher;
23+
24+
/**
25+
* An {@link AsyncRequestBody} decorator that can be split into buffered sub {@link AsyncRequestBody}s. Each sub
26+
* {@link AsyncRequestBody} can be retried/resubscribed if all data has been successfully been published to first subscriber.
27+
*/
28+
@SdkPublicApi
29+
public final class BufferedSplittableAsyncRequestBody implements AsyncRequestBody {
30+
private final AsyncRequestBody delegate;
31+
32+
private BufferedSplittableAsyncRequestBody(AsyncRequestBody delegate) {
33+
this.delegate = delegate;
34+
}
35+
36+
public static BufferedSplittableAsyncRequestBody create(AsyncRequestBody delegate) {
37+
return new BufferedSplittableAsyncRequestBody(delegate);
38+
}
39+
40+
@Override
41+
public Optional<Long> contentLength() {
42+
return delegate.contentLength();
43+
}
44+
45+
@Override
46+
public SdkPublisher<ClosableAsyncRequestBody> splitClosable(AsyncRequestBodySplitConfiguration splitConfiguration) {
47+
return new SplittingPublisher(this, splitConfiguration, true);
48+
}
49+
50+
@Override
51+
public void subscribe(Subscriber<? super ByteBuffer> s) {
52+
delegate.subscribe(s);
53+
}
54+
55+
@Override
56+
public String body() {
57+
return delegate.body();
58+
}
59+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/listener/AsyncRequestBodyListener.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,14 +78,14 @@ public SdkPublisher<AsyncRequestBody> split(Consumer<AsyncRequestBodySplitConfig
7878
}
7979

8080
@Override
81-
public SdkPublisher<ClosableAsyncRequestBody> splitV2(AsyncRequestBodySplitConfiguration splitConfiguration) {
82-
return delegate.splitV2(splitConfiguration);
81+
public SdkPublisher<ClosableAsyncRequestBody> splitClosable(AsyncRequestBodySplitConfiguration splitConfiguration) {
82+
return delegate.splitClosable(splitConfiguration);
8383
}
8484

8585
@Override
86-
public SdkPublisher<ClosableAsyncRequestBody> splitV2(
86+
public SdkPublisher<ClosableAsyncRequestBody> splitClosable(
8787
Consumer<AsyncRequestBodySplitConfiguration.Builder> splitConfiguration) {
88-
return delegate.splitV2(splitConfiguration);
88+
return delegate.splitClosable(splitConfiguration);
8989
}
9090

9191
@Override

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBody.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,10 @@ public String body() {
123123
return BodyType.BYTES.getName();
124124
}
125125

126+
public static ByteBuffersAsyncRequestBody of(List<ByteBuffer> buffers, long length) {
127+
return new ByteBuffersAsyncRequestBody(Mimetype.MIMETYPE_OCTET_STREAM, length, buffers);
128+
}
129+
126130
public static ByteBuffersAsyncRequestBody of(List<ByteBuffer> buffers) {
127131
long length = buffers.stream()
128132
.mapToLong(ByteBuffer::remaining)
@@ -131,7 +135,11 @@ public static ByteBuffersAsyncRequestBody of(List<ByteBuffer> buffers) {
131135
}
132136

133137
public static ByteBuffersAsyncRequestBody of(ByteBuffer... buffers) {
134-
return of(Arrays.asList(buffers));
138+
List<ByteBuffer> byteBuffers = Arrays.asList(buffers);
139+
long length = byteBuffers.stream()
140+
.mapToLong(ByteBuffer::remaining)
141+
.sum();
142+
return of(byteBuffers, length);
135143
}
136144

137145
public static ByteBuffersAsyncRequestBody of(Long length, ByteBuffer... buffers) {

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ClosableAsyncRequestBodyAdaptor.java

Lines changed: 0 additions & 67 deletions
This file was deleted.

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public SdkPublisher<AsyncRequestBody> split(AsyncRequestBodySplitConfiguration s
8888
}
8989

9090
@Override
91-
public SdkPublisher<ClosableAsyncRequestBody> splitV2(AsyncRequestBodySplitConfiguration splitConfiguration) {
91+
public SdkPublisher<ClosableAsyncRequestBody> splitClosable(AsyncRequestBodySplitConfiguration splitConfiguration) {
9292
return split(splitConfiguration).map(body -> new ClosableAsyncRequestBodyWrapper(body));
9393
}
9494

@@ -444,24 +444,30 @@ private static AsynchronousFileChannel openInputChannel(Path path) throws IOExce
444444
}
445445

446446
private static class ClosableAsyncRequestBodyWrapper implements ClosableAsyncRequestBody {
447-
private final AsyncRequestBody body;
447+
private final AsyncRequestBody delegate;
448448

449449
ClosableAsyncRequestBodyWrapper(AsyncRequestBody body) {
450-
this.body = body;
450+
this.delegate = body;
451451
}
452452

453453
@Override
454454
public Optional<Long> contentLength() {
455-
return body.contentLength();
455+
return delegate.contentLength();
456456
}
457457

458458
@Override
459459
public void subscribe(Subscriber<? super ByteBuffer> s) {
460-
body.subscribe(s);
460+
delegate.subscribe(s);
461461
}
462462

463463
@Override
464464
public void close() {
465+
// no op
466+
}
467+
468+
@Override
469+
public String body() {
470+
return delegate.body();
465471
}
466472
}
467473
}

0 commit comments

Comments
 (0)