Skip to content

Commit 0afc13f

Browse files
committed
use extensible supplyAsync() and newCompleteableFuture()
Signed-off-by: Bala.FA <bala@minio.io>
1 parent 5671060 commit 0afc13f

File tree

2 files changed

+16
-7
lines changed

2 files changed

+16
-7
lines changed

api/src/main/java/io/minio/BaseS3Client.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import java.util.concurrent.CompletableFuture;
6363
import java.util.concurrent.CompletionException;
6464
import java.util.concurrent.ConcurrentHashMap;
65+
import java.util.function.Supplier;
6566
import java.util.logging.Logger;
6667
import javax.annotation.Nonnull;
6768
import okhttp3.Call;
@@ -230,6 +231,14 @@ public void setAwsS3Prefix(@Nonnull String awsS3Prefix) {
230231
///////////////////////////////////// HTTP execution methods ////////////////////////////////////
231232
/////////////////////////////////////////////////////////////////////////////////////////////////
232233

234+
protected <T> CompletableFuture<T> supplyAsync(Supplier<T> supplier) {
235+
return CompletableFuture.supplyAsync(supplier);
236+
}
237+
238+
protected <T> CompletableFuture<T> newCompleteableFuture() {
239+
return new CompletableFuture<>();
240+
}
241+
233242
private String[] handleRedirectResponse(
234243
Http.Method method, String bucketName, Response response, boolean retry) {
235244
String code = null;
@@ -282,7 +291,7 @@ protected CompletableFuture<Response> executeAsync(Http.S3Request s3request, Str
282291
// }
283292

284293
okhttp3.Request httpRequest = request.httpRequest();
285-
CompletableFuture<Response> completableFuture = new CompletableFuture<>();
294+
CompletableFuture<Response> completableFuture = newCompleteableFuture();
286295
httpClient
287296
.newCall(httpRequest)
288297
.enqueue(

api/src/main/java/io/minio/MinioAsyncClient.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -562,7 +562,7 @@ args, new SourceObject(args.source(), response.size(), response.etag())))
562562
protected CompletableFuture<Integer> calculatePartCount(List<SourceObject> sources) {
563563
long[] objectSize = {0};
564564

565-
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> 0);
565+
CompletableFuture<Integer> completableFuture = supplyAsync(() -> 0);
566566

567567
int sourceSize = sources.size();
568568
for (int i = 0; i < sourceSize; i++) {
@@ -650,7 +650,7 @@ private CompletableFuture<Part[]> uploadParts(
650650
: null;
651651

652652
int partNumber = 0;
653-
CompletableFuture<Part[]> future = CompletableFuture.supplyAsync(() -> new Part[partCount]);
653+
CompletableFuture<Part[]> future = supplyAsync(() -> new Part[partCount]);
654654
for (SourceObject source : args.sources()) {
655655
long size = source.objectSize();
656656
if (source.length() != null) {
@@ -1753,7 +1753,7 @@ private CompletableFuture<List<UploadPartResponse>> uploadPartsSequentially(
17531753
ByteBuffer buffer,
17541754
long partSize,
17551755
List<UploadPartResponse> responses) {
1756-
return CompletableFuture.supplyAsync(
1756+
return supplyAsync(
17571757
() ->
17581758
new UploadPartArgs(
17591759
args,
@@ -1804,7 +1804,7 @@ private CompletableFuture<List<UploadPartResponse>> uploadPartsParallelly(
18041804
AtomicBoolean errorOccurred = new AtomicBoolean(false);
18051805
ConcurrentLinkedQueue<Throwable> exceptions = new ConcurrentLinkedQueue<>();
18061806

1807-
return CompletableFuture.supplyAsync(
1807+
return supplyAsync(
18081808
() -> {
18091809
try {
18101810
// Start uploader workers
@@ -3197,7 +3197,7 @@ public CompletableFuture<ObjectWriteResponse> uploadSnowballObjects(
31973197
UploadSnowballObjectsArgs args) {
31983198
checkArgs(args);
31993199

3200-
return CompletableFuture.supplyAsync(
3200+
return supplyAsync(
32013201
() -> {
32023202
FileOutputStream fos = null;
32033203
BufferedOutputStream bos = null;
@@ -3309,7 +3309,7 @@ public CompletableFuture<PutObjectFanOutResponse> putObjectFanOut(PutObjectFanOu
33093309
checkArgs(args);
33103310
args.validateSse(this.baseUrl.isHttps());
33113311

3312-
return CompletableFuture.supplyAsync(
3312+
return supplyAsync(
33133313
() -> {
33143314
byte[] buf16k = new byte[16384]; // 16KiB buffer for optimization.
33153315
ByteBuffer buffer = new ByteBuffer(args.size());

0 commit comments

Comments
 (0)