Skip to content

Commit 6632e74

Browse files
authored
Fix NPE issue in multipart S3 client (#6594)
* Fix NPE issue thrown when using multipart S3 client to upload an object containing empty content without supplying a content length * Apply suggestion from @zoewangg * Address feedback
1 parent 909f68e commit 6632e74

File tree

5 files changed

+116
-7
lines changed

5 files changed

+116
-7
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "Amazon S3",
4+
"contributor": "",
5+
"description": "Fix NPE issue thrown when using multipart S3 client to upload an object containing empty content without supplying a content length. See [#6464](https://github.com/aws/aws-sdk-java-v2/issues/6464)"
6+
}

services/s3/src/it/java/software/amazon/awssdk/services/s3/multipart/S3MultipartClientPutObjectIntegrationTest.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@
8989
import software.amazon.awssdk.utils.BinaryUtils;
9090
import software.amazon.awssdk.utils.IoUtils;
9191
import software.amazon.awssdk.utils.Md5Utils;
92+
import software.amazon.awssdk.utils.StringInputStream;
9293

9394
@Timeout(value = 60, unit = SECONDS)
9495
public class S3MultipartClientPutObjectIntegrationTest extends S3IntegrationTestBase {
@@ -102,6 +103,7 @@ public class S3MultipartClientPutObjectIntegrationTest extends S3IntegrationTest
102103
private static ExecutorService executorService = Executors.newFixedThreadPool(5);
103104
private static byte[] bytes;
104105
private static byte[] expectedChecksum;
106+
private static byte[] expectedChecksumForEmptyString;
105107

106108
@BeforeAll
107109
public static void setup() throws Exception {
@@ -111,6 +113,7 @@ public static void setup() throws Exception {
111113
testFile = new RandomTempFile(OBJ_SIZE);
112114
bytes = Files.readAllBytes(testFile.toPath());
113115
expectedChecksum = ChecksumUtils.computeCheckSum(Files.newInputStream(testFile.toPath()));
116+
expectedChecksumForEmptyString = ChecksumUtils.computeCheckSum(new StringInputStream(""));
114117
mpuS3Client = S3AsyncClient
115118
.builder()
116119
.region(DEFAULT_REGION)
@@ -137,8 +140,14 @@ public static Stream<Arguments> asyncRequestBodies() {
137140
executorService)),
138141
Arguments.of("inputStream_unknownLength",
139142
AsyncRequestBody.fromInputStream(new ByteArrayInputStream(bytes), null,
140-
executorService))
141-
);
143+
executorService)));
144+
}
145+
146+
public static Stream<Arguments> emptyAsyncRequestBodies() {
147+
return Stream.of(Arguments.of("knownLength", AsyncRequestBody.fromString("")),
148+
Arguments.of("unknownLength",
149+
AsyncRequestBody.fromInputStream(new StringInputStream(""), null,
150+
executorService)));
142151
}
143152

144153
@BeforeEach
@@ -181,6 +190,17 @@ void putObject_variousRequestBody_objectSentCorrectly(String description, AsyncR
181190
assertThat(ChecksumUtils.computeCheckSum(objContent)).isEqualTo(expectedChecksum);
182191
}
183192

193+
@ParameterizedTest
194+
@MethodSource("emptyAsyncRequestBodies")
195+
void putObject_variousEmptyRequestBody_objectSentCorrectly(String description, AsyncRequestBody body) throws Exception {
196+
mpuS3Client.putObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), body).join();
197+
198+
ResponseInputStream<GetObjectResponse> objContent = s3.getObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY),
199+
ResponseTransformer.toInputStream());
200+
assertThat(objContent.response().contentLength()).isEqualTo(0);
201+
assertThat(ChecksumUtils.computeCheckSum(objContent)).isEqualTo(expectedChecksumForEmptyString);
202+
}
203+
184204

185205
@ParameterizedTest
186206
@MethodSource("asyncRequestBodies")

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithUnknownContentLengthHelper.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,11 @@ public CompletableFuture<PutObjectResponse> uploadObject(PutObjectRequest putObj
9292
return returnFuture;
9393
}
9494

95-
private class UnknownContentLengthAsyncRequestBodySubscriber implements Subscriber<CloseableAsyncRequestBody> {
95+
final class UnknownContentLengthAsyncRequestBodySubscriber implements Subscriber<CloseableAsyncRequestBody> {
9696
/**
9797
* Indicates whether this is the first async request body or not.
9898
*/
99-
private final AtomicBoolean isFirstAsyncRequestBody = new AtomicBoolean(true);
99+
private final AtomicBoolean firstAsyncRequestBodyReceived = new AtomicBoolean(false);
100100

101101
/**
102102
* Indicates whether CreateMultipartUpload has been initiated or not
@@ -163,9 +163,17 @@ public void onSubscribe(Subscription s) {
163163

164164
@Override
165165
public void onNext(CloseableAsyncRequestBody asyncRequestBody) {
166+
if (asyncRequestBody == null) {
167+
NullPointerException exception = new NullPointerException("asyncRequestBody passed to onNext MUST NOT be null.");
168+
multipartUploadHelper.failRequestsElegantly(futures,
169+
exception, uploadId, returnFuture, putObjectRequest);
170+
throw exception;
171+
}
172+
166173
if (isDone) {
167174
return;
168175
}
176+
169177
int currentPartNum = partNumber.incrementAndGet();
170178
log.trace(() -> "Received asyncRequestBody " + asyncRequestBody.contentLength());
171179
asyncRequestBodyInFlight.incrementAndGet();
@@ -178,7 +186,7 @@ public void onNext(CloseableAsyncRequestBody asyncRequestBody) {
178186
return;
179187
}
180188

181-
if (isFirstAsyncRequestBody.compareAndSet(true, false)) {
189+
if (firstAsyncRequestBodyReceived.compareAndSet(false, true)) {
182190
log.trace(() -> "Received first async request body");
183191
// If this is the first AsyncRequestBody received, request another one because we don't know if there is more
184192
firstRequestBody = asyncRequestBody;
@@ -276,10 +284,13 @@ public void onError(Throwable t) {
276284
@Override
277285
public void onComplete() {
278286
log.debug(() -> "Received onComplete()");
279-
// If CreateMultipartUpload has not been initiated at this point, we know this is a single object upload
287+
// If CreateMultipartUpload has not been initiated at this point, we know this is a single object upload, and if no
288+
// async request body has been received, it's an empty stream
280289
if (createMultipartUploadInitiated.get() == false) {
281290
log.debug(() -> "Starting the upload as a single object upload request");
282-
multipartUploadHelper.uploadInOneChunk(putObjectRequest, firstRequestBody, returnFuture);
291+
AsyncRequestBody entireRequestBody = firstAsyncRequestBodyReceived.get() ? firstRequestBody :
292+
AsyncRequestBody.empty();
293+
multipartUploadHelper.uploadInOneChunk(putObjectRequest, entireRequestBody, returnFuture);
283294
} else {
284295
isDone = true;
285296
completeMultipartUploadIfFinish(asyncRequestBodyInFlight.get());

services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithUnknownContentLengthHelperTest.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package software.amazon.awssdk.services.s3.internal.multipart;
1717

1818
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
1920
import static org.mockito.ArgumentMatchers.any;
2021
import static org.mockito.ArgumentMatchers.eq;
2122
import static org.mockito.Mockito.mock;
@@ -24,6 +25,7 @@
2425
import static org.mockito.Mockito.when;
2526
import static software.amazon.awssdk.services.s3.internal.multipart.utils.MultipartUploadTestUtils.stubSuccessfulCompleteMultipartCall;
2627
import static software.amazon.awssdk.services.s3.internal.multipart.utils.MultipartUploadTestUtils.stubSuccessfulCreateMultipartCall;
28+
import static software.amazon.awssdk.services.s3.internal.multipart.utils.MultipartUploadTestUtils.stubSuccessfulPutObjectCall;
2729
import static software.amazon.awssdk.services.s3.internal.multipart.utils.MultipartUploadTestUtils.stubSuccessfulUploadPartCalls;
2830

2931
import java.io.FileInputStream;
@@ -55,6 +57,7 @@
5557
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
5658
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
5759
import software.amazon.awssdk.testutils.RandomTempFile;
60+
import software.amazon.awssdk.utils.StringInputStream;
5861

5962
public class UploadWithUnknownContentLengthHelperTest {
6063
private static final String BUCKET = "bucket";
@@ -116,13 +119,63 @@ void uploadObject_withMissingContentLength_shouldFailRequest() {
116119
verifyFailureWithMessage(future, "Content length is missing on the AsyncRequestBody for part number");
117120
}
118121

122+
@Test
123+
void uploadObject_emptyBody_shouldSucceed() {
124+
stubSuccessfulPutObjectCall(s3AsyncClient);
125+
126+
BlockingInputStreamAsyncRequestBody body = AsyncRequestBody.forBlockingInputStream(null);
127+
CompletableFuture<PutObjectResponse> future = helper.uploadObject(createPutObjectRequest(), body);
128+
body.writeInputStream(new StringInputStream(""));
129+
future.join();
130+
131+
ArgumentCaptor<PutObjectRequest> requestArgumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class);
132+
ArgumentCaptor<AsyncRequestBody> requestBodyArgumentCaptor = ArgumentCaptor.forClass(AsyncRequestBody.class);
133+
verify(s3AsyncClient, times(1)).putObject(requestArgumentCaptor.capture(),
134+
requestBodyArgumentCaptor.capture());
135+
136+
List<PutObjectRequest> actualRequests = requestArgumentCaptor.getAllValues();
137+
List<AsyncRequestBody> actualRequestBodies = requestBodyArgumentCaptor.getAllValues();
138+
assertThat(actualRequestBodies).hasSize(1);
139+
assertThat(actualRequests).hasSize(1);
140+
141+
PutObjectRequest putObjectRequest = actualRequests.get(0);
142+
assertThat(putObjectRequest.bucket()).isEqualTo(BUCKET);
143+
assertThat(putObjectRequest.key()).isEqualTo(KEY);
144+
assertThat(actualRequestBodies.get(0).contentLength()).hasValue(0L);
145+
}
146+
119147
@Test
120148
void uploadObject_withPartSizeExceedingLimit_shouldFailRequest() {
121149
CloseableAsyncRequestBody asyncRequestBody = createMockAsyncRequestBody(PART_SIZE + 1);
122150
CompletableFuture<PutObjectResponse> future = setupAndTriggerUploadFailure(asyncRequestBody);
123151
verifyFailureWithMessage(future, "Content length must not be greater than part size");
124152
}
125153

154+
@Test
155+
void uploadObject_nullAsyncRequestBody_shouldFailRequest() {
156+
CloseableAsyncRequestBody asyncRequestBody = createMockAsyncRequestBody(PART_SIZE);
157+
SdkPublisher<CloseableAsyncRequestBody> mockPublisher = mock(SdkPublisher.class);
158+
when(asyncRequestBody.splitCloseable(any(Consumer.class))).thenReturn(mockPublisher);
159+
160+
ArgumentCaptor<Subscriber<CloseableAsyncRequestBody>> subscriberCaptor = ArgumentCaptor.forClass(Subscriber.class);
161+
CompletableFuture<PutObjectResponse> future = helper.uploadObject(createPutObjectRequest(), asyncRequestBody);
162+
163+
verify(mockPublisher).subscribe(subscriberCaptor.capture());
164+
Subscriber<CloseableAsyncRequestBody> subscriber = subscriberCaptor.getValue();
165+
166+
Subscription subscription = mock(Subscription.class);
167+
subscriber.onSubscribe(subscription);
168+
assertThatThrownBy(() -> subscriber.onNext(null)).isInstanceOf(NullPointerException.class).hasMessageContaining(
169+
"asyncRequestBody");
170+
171+
assertThat(future).isCompletedExceptionally();
172+
future.exceptionally(throwable -> {
173+
assertThat(throwable.getCause()).isInstanceOf(NullPointerException.class);
174+
assertThat(throwable.getCause().getMessage()).contains("MUST NOT be null");
175+
return null;
176+
}).join();
177+
}
178+
126179
private PutObjectRequest createPutObjectRequest() {
127180
return PutObjectRequest.builder()
128181
.bucket(BUCKET)

services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/utils/MultipartUploadTestUtils.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
3030
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
3131
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
32+
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
33+
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
3234
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
3335
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
3436
import software.amazon.awssdk.services.s3.multipart.S3ResumeToken;
@@ -69,6 +71,23 @@ public static void stubSuccessfulCompleteMultipartCall(String bucket, String key
6971
.thenReturn(completeMultipartUploadFuture);
7072
}
7173

74+
public static void stubSuccessfulPutObjectCall(S3AsyncClient s3AsyncClient) {
75+
when(s3AsyncClient.putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class)))
76+
.thenAnswer(new Answer<CompletableFuture<PutObjectResponse>>() {
77+
78+
@Override
79+
public CompletableFuture<PutObjectResponse> answer(InvocationOnMock invocationOnMock) {
80+
AsyncRequestBody asyncRequestBody = invocationOnMock.getArgument(1);
81+
// Draining the request body
82+
asyncRequestBody.subscribe(b -> {
83+
});
84+
85+
return CompletableFuture.completedFuture(PutObjectResponse.builder()
86+
.build());
87+
}
88+
});
89+
}
90+
7291
public static void stubSuccessfulUploadPartCalls(S3AsyncClient s3AsyncClient) {
7392
when(s3AsyncClient.uploadPart(any(UploadPartRequest.class), any(AsyncRequestBody.class)))
7493
.thenAnswer(new Answer<CompletableFuture<UploadPartResponse>>() {

0 commit comments

Comments
 (0)