diff --git a/src/main/java/com/cognite/client/Files.java b/src/main/java/com/cognite/client/Files.java index f708e337..e5fc578c 100644 --- a/src/main/java/com/cognite/client/Files.java +++ b/src/main/java/com/cognite/client/Files.java @@ -26,8 +26,10 @@ import com.cognite.client.servicesV1.parser.ItemParser; import com.cognite.client.util.Partition; import com.google.auto.value.AutoValue; +import com.google.cloud.Tuple; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import okhttp3.internal.http2.StreamResetException; import org.apache.commons.lang3.RandomStringUtils; import org.jetbrains.annotations.NotNull; @@ -96,6 +98,11 @@ public Iterator> list() throws Exception { return this.list(Request.create()); } + public static Files create() { + return builder() + .build(); + } + /** * Returns all {@link FileMetadata} objects that matches the filters set in the {@link Request}. * @@ -187,57 +194,19 @@ public List upsert(List fileMetadataList) throws Exc ConnectorServiceV1.ItemWriter updateWriter = getClient().getConnectorService().updateFileHeaders(); ConnectorServiceV1.ItemWriter createWriter = getClient().getConnectorService().writeFileHeaders(); - // naive de-duplication based on ids - Map internalIdUpdateMap = new HashMap<>(1000); - Map externalIdUpdateMap = new HashMap<>(1000); - Map internalIdAssetsMap = new HashMap<>(50); - Map externalIdAssetsMap = new HashMap<>(50); - for (FileMetadata value : fileMetadataList) { - if (value.hasExternalId()) { - externalIdUpdateMap.put(value.getExternalId(), value); - } else if (value.hasId()) { - internalIdUpdateMap.put(value.getId(), value); - } else { - throw new Exception("File metadata item does not contain id nor externalId: " + value.toString()); - } - } - - // Check for files with >1k assets. Set the extra assets aside so we can add them in separate updates. - for (Long key : internalIdUpdateMap.keySet()) { - FileMetadata fileMetadata = internalIdUpdateMap.get(key); - if (fileMetadata.getAssetIdsCount() > 1000) { - internalIdUpdateMap.put(key, fileMetadata.toBuilder() - .clearAssetIds() - .addAllAssetIds(fileMetadata.getAssetIdsList().subList(0,1000)) - .build()); - internalIdAssetsMap.put(key, FileMetadata.newBuilder() - .setId(fileMetadata.getId()) - .addAllAssetIds(fileMetadata.getAssetIdsList().subList(1000, fileMetadata.getAssetIdsList().size())) - .build()); - } - } - for (String key : externalIdUpdateMap.keySet()) { - FileMetadata fileMetadata = externalIdUpdateMap.get(key); - if (fileMetadata.getAssetIdsCount() > 1000) { - externalIdUpdateMap.put(key, fileMetadata.toBuilder() - .clearAssetIds() - .addAllAssetIds(fileMetadata.getAssetIdsList().subList(0,1000)) - .build()); - externalIdAssetsMap.put(key, FileMetadata.newBuilder() - .setExternalId(fileMetadata.getExternalId()) - .addAllAssetIds(fileMetadata.getAssetIdsList().subList(1000, fileMetadata.getAssetIdsList().size())) - .build()); - } - } + // mapToId eliminates duplicates + final Collection uniqueFilesMetadata = mapToId(fileMetadataList).values(); // Combine the input into list - List elementListUpdate = new ArrayList<>(); + List elementListUpdate = uniqueFilesMetadata.stream() + .map(fileHeader -> fileHeader.toBuilder() + .clearAssetIds() + .addAllAssetIds(fileHeader.getAssetIdsList().subList(0, 1000)) + .build()) + .collect(Collectors.toList()); List elementListCreate = new ArrayList<>(); List elementListCompleted = new ArrayList<>(); - elementListUpdate.addAll(externalIdUpdateMap.values()); - elementListUpdate.addAll(internalIdUpdateMap.values()); - /* The upsert loop. If there are items left to insert or update: 1. Update elements @@ -288,17 +257,7 @@ public List upsert(List fileMetadataList) throws Exc LOG.debug(loggingPrefix + "Number of missing entries reported by CDF: {}", missing.size()); // Move missing items from update to the create request - Map itemsMap = mapToId(updateResponseMap.get(response)); - for (Item value : missing) { - if (value.getIdTypeCase() == Item.IdTypeCase.EXTERNAL_ID) { - elementListCreate.add(itemsMap.get(value.getExternalId())); - itemsMap.remove(value.getExternalId()); - } else if (value.getIdTypeCase() == Item.IdTypeCase.ID) { - elementListCreate.add(itemsMap.get(String.valueOf(value.getId()))); - itemsMap.remove(String.valueOf(value.getId())); - } - } - elementListUpdate.addAll(itemsMap.values()); // Add remaining items to be re-updated + repurposeMissingItems(elementListUpdate, elementListCreate, updateResponseMap, response, missing); } } } @@ -309,7 +268,7 @@ public List upsert(List fileMetadataList) throws Exc if (elementListCreate.isEmpty()) { LOG.debug(loggingPrefix + "Create items list is empty. Skipping create."); } else { - Map, FileMetadata> createResponseMap = + Map, List> createResponseMap = splitAndCreateFileMetadata(elementListCreate, createWriter); LOG.debug(loggingPrefix + "Completed create items requests for {} items across {} batches at duration {}", elementListCreate.size(), @@ -334,17 +293,7 @@ public List upsert(List fileMetadataList) throws Exc LOG.debug(loggingPrefix + "Number of duplicate entries reported by CDF: {}", duplicates.size()); // Move duplicates from insert to the update request - Map itemsMap = mapToId(ImmutableList.of(createResponseMap.get(response))); - for (Item value : duplicates) { - if (value.getIdTypeCase() == Item.IdTypeCase.EXTERNAL_ID) { - elementListUpdate.add(itemsMap.get(value.getExternalId())); - itemsMap.remove(value.getExternalId()); - } else if (value.getIdTypeCase() == Item.IdTypeCase.ID) { - elementListUpdate.add(itemsMap.get(String.valueOf(value.getId()))); - itemsMap.remove(String.valueOf(value.getId())); - } - } - elementListCreate.addAll(itemsMap.values()); // Add remaining items to be re-inserted + repurposeMissingItems(elementListCreate, elementListUpdate, createResponseMap, response, duplicates); } } } @@ -356,70 +305,22 @@ public List upsert(List fileMetadataList) throws Exc file create/update (all the code above) and subsequent update requests which add the remaining assetIds (code below). */ - Map internalIdTempMap = new HashMap<>(); - Map externalIdTempMap = new HashMap<>(); - List elementListAssetUpdate = new ArrayList<>(); - while (internalIdAssetsMap.size() > 0 || externalIdAssetsMap.size() > 0) { + final List elementListAssetUpdate = uniqueFilesMetadata.stream() + .parallel() + .filter(fileMetadata -> fileMetadata.getAssetIdsList().size() > 1000) + .flatMap(fileMetadata -> Lists.partition(fileMetadata.getAssetIdsList(), 1000).stream() + .skip(1) // first 1000 was written already + .map(collection -> fileMetadata.toBuilder() + .clearAssetIds() + .addAllAssetIds(collection) + .build()) + ) + .collect(Collectors.toList()); + + if (elementListAssetUpdate.size() > 0) { LOG.info(loggingPrefix + "Some files have very high assetId cardinality (+1k). Adding assetId to " - + (internalIdAssetsMap.size() + externalIdAssetsMap.size()) + + elementListAssetUpdate.size() + " file(s)."); - internalIdUpdateMap.clear(); - externalIdUpdateMap.clear(); - internalIdTempMap.clear(); - externalIdTempMap.clear(); - - // Check for files with >1k remaining assets - for (Long key : internalIdAssetsMap.keySet()) { - FileMetadata fileMetadata = internalIdAssetsMap.get(key); - if (fileMetadata.getAssetIdsCount() > 1000) { - internalIdUpdateMap.put(key, fileMetadata.toBuilder() - .clearAssetIds() - .addAllAssetIds(fileMetadata.getAssetIdsList().subList(0,1000)) - .build()); - internalIdTempMap.put(key, FileMetadata.newBuilder() - .setId(fileMetadata.getId()) - .addAllAssetIds(fileMetadata.getAssetIdsList().subList(1000, fileMetadata.getAssetIdsList().size())) - .build()); - } else { - // The entire assetId list can be pushed in a single update - internalIdUpdateMap.put(key, fileMetadata); - } - } - internalIdAssetsMap.clear(); - internalIdAssetsMap.putAll(internalIdTempMap); - - for (String key : externalIdAssetsMap.keySet()) { - FileMetadata fileMetadata = externalIdAssetsMap.get(key); - if (fileMetadata.getAssetIdsCount() > 1000) { - externalIdUpdateMap.put(key, fileMetadata.toBuilder() - .clearAssetIds() - .addAllAssetIds(fileMetadata.getAssetIdsList().subList(0,1000)) - .build()); - externalIdTempMap.put(key, FileMetadata.newBuilder() - .setExternalId(fileMetadata.getExternalId()) - .addAllAssetIds(fileMetadata.getAssetIdsList().subList(1000, fileMetadata.getAssetIdsList().size())) - .build()); - } else { - // The entire assetId list can be pushed in a single update - externalIdUpdateMap.put(key, fileMetadata); - } - } - externalIdAssetsMap.clear(); - externalIdAssetsMap.putAll(externalIdTempMap); - - // prepare the update and send request - LOG.info(loggingPrefix + "Building update request to add assetIds for {} files.", - internalIdUpdateMap.size() + externalIdUpdateMap.size()); - elementListAssetUpdate.clear(); - elementListAssetUpdate.addAll(externalIdUpdateMap.values()); - elementListAssetUpdate.addAll(internalIdUpdateMap.values()); - - // should not happen, but need to check - if (elementListAssetUpdate.isEmpty()) { - String message = loggingPrefix + "Internal error. Not able to send assetId update. The payload is empty."; - LOG.error(message); - throw new Exception(message); - } Map, List> responseItemsAssets = splitAndAddAssets(elementListAssetUpdate, updateWriter); @@ -451,13 +352,29 @@ public List upsert(List fileMetadataList) throws Exc elementListCompleted.size())); } - - return elementListCompleted.stream() .map(this::parseFileMetadata) .collect(Collectors.toList()); } + private void repurposeMissingItems(List elementListUpdate, + List elementListCreate, + Map, List> updateResponseMap, + ResponseItems response, + List missing) { + Map itemsMap = mapToId(updateResponseMap.get(response)); + for (Item value : missing) { + if (value.getIdTypeCase() == Item.IdTypeCase.EXTERNAL_ID) { + elementListCreate.add(itemsMap.get(value.getExternalId())); + itemsMap.remove(value.getExternalId()); + } else if (value.getIdTypeCase() == Item.IdTypeCase.ID) { + elementListCreate.add(itemsMap.get(String.valueOf(value.getId()))); + itemsMap.remove(String.valueOf(value.getId())); + } + } + elementListUpdate.addAll(itemsMap.values()); // Add remaining items to be re-updated + } + /** * Uploads a set of file headers and binaries to Cognite Data Fusion. * @@ -532,8 +449,7 @@ public List upload(@NotNull List files, boolean del // Must unwrap the completion exception Throwable cause = e.getCause(); - if (RETRYABLE_EXCEPTIONS_BINARY_UPLOAD.stream() - .anyMatch(retryable -> retryable.isInstance(cause))) { + if (RETRYABLE_EXCEPTIONS_BINARY_UPLOAD.stream().anyMatch(retryable -> retryable.isInstance(cause))) { // The API is most likely saturated. Retry the uploads one file at a time. LOG.warn(batchLoggingPrefix + "Error when uploading the batch of file binaries. Will retry each file individually."); for (FileContainer file : uploadBatch) { @@ -787,8 +703,8 @@ public List downloadFileBinaries(List fileItems, } LOG.debug(loggingPrefix + "Removing duplicates and missing items and retrying the request"); List duplicates = ItemParser.parseItems(responseBatch.get(0).getDuplicateItems()); - List missing = new ArrayList(); // Must define this as an explicit List for it to be mutable - missing.addAll(ItemParser.parseItems(responseBatch.get(0).getMissingItems())); + // Must define this as an explicit List for it to be mutable + List missing = new ArrayList(ItemParser.parseItems(responseBatch.get(0).getMissingItems())); LOG.debug(loggingPrefix + "No of duplicates reported: {}", duplicates.size()); LOG.debug(loggingPrefix + "No of missing items reported: {}", missing.size()); @@ -988,32 +904,23 @@ private Map>, List> splitAndDownloadFileBin return responseMap; } - /** - * Update file metadata items. - * - * Submits a (large) batch of items by splitting it up into multiple, parallel create / insert requests. - * The response from each request is returned along with the items used as input. - * - * @param fileMetadataList the objects to create/insert. - * @param updateWriter the ItemWriter to use for sending update requests - * @return a {@link Map} with the responses and request inputs. - * @throws Exception - */ - private Map, List> splitAndUpdateFileMetadata(List fileMetadataList, - ConnectorServiceV1.ItemWriter updateWriter) throws Exception { + private Map, List> splitAndAct( + List fileMetadataList, + ConnectorServiceV1.ItemWriter updateWriter, + FileAction action + ) throws Exception { + Map>, List> responseMap = new HashMap<>(); List> batches = Partition.ofSize(fileMetadataList, MAX_WRITE_REQUEST_BATCH_SIZE); // Submit all batches for (List fileBatch : batches) { - responseMap.put(updateFileMetadata(fileBatch, updateWriter), fileBatch); + responseMap.put(action.apply(fileBatch, updateWriter), fileBatch); } // Wait for all requests futures to complete - List>> futureList = new ArrayList<>(); - responseMap.keySet().forEach(future -> futureList.add(future)); CompletableFuture allFutures = - CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()])); + CompletableFuture.allOf(responseMap.keySet().toArray(new CompletableFuture[0])); allFutures.join(); // Wait for all futures to complete // Collect the responses from the futures @@ -1026,40 +933,37 @@ private Map, List> splitAndUpdateFileMetadat } /** - * Adds asset ids to existing file metadata objects. + * Update file metadata items. * * Submits a (large) batch of items by splitting it up into multiple, parallel create / insert requests. * The response from each request is returned along with the items used as input. * * @param fileMetadataList the objects to create/insert. - * @param updateWriter the ItemWriter to use for sending update requests + * @param updateWriter the ItemWriter to use for sending update requests * @return a {@link Map} with the responses and request inputs. * @throws Exception */ - private Map, List> splitAndAddAssets(List fileMetadataList, - ConnectorServiceV1.ItemWriter updateWriter) throws Exception { - Map>, List> responseMap = new HashMap<>(); - List> batches = Partition.ofSize(fileMetadataList, MAX_WRITE_REQUEST_BATCH_SIZE); - - // Submit all batches - for (List fileBatch : batches) { - responseMap.put(addFileAssets(fileBatch, updateWriter), fileBatch); - } - - // Wait for all requests futures to complete - List>> futureList = new ArrayList<>(); - responseMap.keySet().forEach(future -> futureList.add(future)); - CompletableFuture allFutures = - CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()])); - allFutures.join(); // Wait for all futures to complete - - // Collect the responses from the futures - Map, List> resultsMap = new HashMap<>(responseMap.size()); - for (Map.Entry>, List> entry : responseMap.entrySet()) { - resultsMap.put(entry.getKey().join(), entry.getValue()); - } + private Map, List> splitAndUpdateFileMetadata( + List fileMetadataList, + ConnectorServiceV1.ItemWriter updateWriter) throws Exception { + return splitAndAct(fileMetadataList, updateWriter, this::updateFileMetadata); + } - return resultsMap; + /** + * Adds asset ids to existing file metadata objects. + * + * Submits a (large) batch of items by splitting it up into multiple, parallel create / insert requests. + * The response from each request is returned along with the items used as input. + * + * @param fileMetadataList the objects to create/insert. + * @param updateWriter the ItemWriter to use for sending update requests + * @return a {@link Map} with the responses and request inputs. + * @throws Exception + */ + private Map, List> splitAndAddAssets( + List fileMetadataList, + ConnectorServiceV1.ItemWriter updateWriter) throws Exception { + return splitAndAct(fileMetadataList, updateWriter, this::addFileAssets); } /** @@ -1073,29 +977,16 @@ private Map, List> splitAndAddAssets(List, FileMetadata> splitAndCreateFileMetadata(List fileMetadataList, - ConnectorServiceV1.ItemWriter createWriter) throws Exception { - Map>, FileMetadata> responseMap = new HashMap<>(); - - // Submit all batches - for (FileMetadata file : fileMetadataList) { - responseMap.put(createFileMetadata(file, createWriter), file); + private Map, List> splitAndCreateFileMetadata( + List fileMetadataList, + ConnectorServiceV1.ItemWriter createWriter) throws Exception { + + // files API doesn't have batching support and accept single item at a time + Map, List> output = new HashMap<>(); + for(FileMetadata fileMetadata: fileMetadataList) { + output.putAll(splitAndAct(ImmutableList.of(fileMetadata), createWriter, this::createFileMetadata)); } - - // Wait for all requests futures to complete - List>> futureList = new ArrayList<>(); - responseMap.keySet().forEach(future -> futureList.add(future)); - CompletableFuture allFutures = - CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()])); - allFutures.join(); // Wait for all futures to complete - - // Collect the responses from the futures - Map, FileMetadata> resultsMap = new HashMap<>(responseMap.size()); - for (Map.Entry>, FileMetadata> entry : responseMap.entrySet()) { - resultsMap.put(entry.getKey().join(), entry.getValue()); - } - - return resultsMap; + return output; } /** @@ -1106,24 +997,30 @@ private Map, FileMetadata> splitAndCreateFileMetadata(List * split the input into multiple batches. If you have a large batch of {@link FileMetadata} that * you would like to split across multiple requests, use the {@code splitAndCreateFileMetadata} method. * - * @param fileMetadata + * @param filesBatch * @param fileWriter * @return * @throws Exception */ - private CompletableFuture> createFileMetadata(FileMetadata fileMetadata, - ConnectorServiceV1.ItemWriter fileWriter) throws Exception { + private CompletableFuture> createFileMetadata(Collection filesBatch, + ConnectorServiceV1.ItemWriter fileWriter) throws Exception { String loggingPrefix = "createFileMetadata() - "; LOG.debug(loggingPrefix + "Received file metadata item / header to create."); - + // files endpoint doesn't support batching + Preconditions.checkArgument(filesBatch.size() == 1); // build request object + FileMetadata fileMetadata = filesBatch.stream().findFirst().orElseThrow(); Request postSeqBody = addAuthInfo(Request.create() .withRequestParameters(toRequestInsertItem(fileMetadata))); - // post write request return fileWriter.writeItemsAsync(postSeqBody); } + @FunctionalInterface + public interface FileAction { + CompletableFuture> apply(Collection c, ConnectorServiceV1.ItemWriter w) throws Exception; + } + /** * Post a collection of {@link FileMetadata} update request on a separate thread. The response is wrapped in a * {@link CompletableFuture} that is returned immediately to the caller. @@ -1197,17 +1094,9 @@ private CompletableFuture> addFileAssets(Collection mapToId(List fileMetadataList) { - Map idMap = new HashMap<>(); - for (FileMetadata fileMetadata : fileMetadataList) { - if (fileMetadata.hasExternalId()) { - idMap.put(fileMetadata.getExternalId(), fileMetadata); - } else if (fileMetadata.hasId()) { - idMap.put(String.valueOf(fileMetadata.getId()), fileMetadata); - } else { - idMap.put("", fileMetadata); - } - } - return idMap; + return fileMetadataList.stream() + .map(fileMetadata -> Tuple.of(getFileId(fileMetadata).orElse(""), fileMetadata)) + .collect(Collectors.toMap(Tuple::x, Tuple::y, (o1, o2) -> o1)); // remove duplicates by picking random } /* diff --git a/src/main/java/com/cognite/client/servicesV1/ConnectorServiceV1.java b/src/main/java/com/cognite/client/servicesV1/ConnectorServiceV1.java index 71b6aec9..3b16a97c 100644 --- a/src/main/java/com/cognite/client/servicesV1/ConnectorServiceV1.java +++ b/src/main/java/com/cognite/client/servicesV1/ConnectorServiceV1.java @@ -1049,7 +1049,7 @@ public ItemWriter writeFileHeaders() { FilesUploadRequestProvider requestProvider = FilesUploadRequestProvider.builder() .setEndpoint("files") - .setRequest(Request.create()) + .setRequest(Request.create().withRootParameter("overwrite", true)) .setSdkIdentifier(getClient().getClientConfig().getSdkIdentifier()) .setAppIdentifier(getClient().getClientConfig().getAppIdentifier()) .setSessionIdentifier(getClient().getClientConfig().getSessionIdentifier()) diff --git a/src/main/java/com/cognite/client/servicesV1/executor/FileBinaryRequestExecutor.java b/src/main/java/com/cognite/client/servicesV1/executor/FileBinaryRequestExecutor.java index 1f3b1f4e..e9a3ef2a 100644 --- a/src/main/java/com/cognite/client/servicesV1/executor/FileBinaryRequestExecutor.java +++ b/src/main/java/com/cognite/client/servicesV1/executor/FileBinaryRequestExecutor.java @@ -50,6 +50,7 @@ import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.concurrent.*; /** @@ -282,7 +283,7 @@ public FileBinary downloadBinary(Request request) throws Exception { if (!response.isSuccessful() && !getValidResponseCodes().contains(responseCode)) { String errorMessage = "Downloading file binary: Unexpected response code: " + responseCode + ". " + response.toString() + System.lineSeparator() - + "Response body: " + response.body().string() + System.lineSeparator() + + "Response body: " + Objects.requireNonNull(response.body()).string() + System.lineSeparator() + "Response headers: " + response.headers().toString() + System.lineSeparator(); if (responseCode >= 400 && responseCode < 500) { @@ -296,35 +297,35 @@ public FileBinary downloadBinary(Request request) throws Exception { // check the response if (response.body() == null) { throw new Exception(loggingPrefix + "Successful response, but the body is null. " - + response.toString() + System.lineSeparator() - + "Response headers: " + response.headers().toString()); + + response.toString() + System.lineSeparator() + + "Response headers: " + response.headers().toString()); } // check the content length. When downloading files >200MiB we will use temp storage. - if (response.body().contentLength() > (1024L * 1024L * 200L) || isForceTempStorage()) { + final long contentLength = Objects.requireNonNull(response.body()).contentLength(); + if (contentLength > (1024L * 1024L * 200L) || isForceTempStorage()) { if (null == getTempStoragePath()) { String message = String.format("File too large to download to memory. Consider configuring temp " + "storage on the file reader.%n" + "Content-length = [%d]. %n" + "Response headers: %s", - response.body().contentLength(), + contentLength, response.headers().toString()); throw new IOException(message); } LOG.info("Downloading {} MiB to temp storage binary.", - String.format("%.2f", response.body().contentLength() / (1024d * 1024d))); + String.format("%.2f", contentLength / (1024d * 1024d))); return downloadBinaryToTempStorage(response); } else { LOG.info("Downloading {} MiB to memory-based binary.", - String.format("%.2f", response.body().contentLength() / (1024d * 1024d))); + String.format("%.2f", contentLength / (1024d * 1024d))); return FileBinary.newBuilder() - .setBinary(ByteString.readFrom(response.body().byteStream())) - .setContentLength(response.body().contentLength()) + .setBinary(ByteString.readFrom(Objects.requireNonNull(response.body()).byteStream())) + .setContentLength(contentLength) .build(); } } catch (Exception e) { catchedExceptions.add(e); - // if we get a transient error, retry the call if (RETRYABLE_EXCEPTIONS.stream().anyMatch(known -> known.isInstance(e)) || RETRYABLE_RESPONSE_CODES.contains(responseCode)) { @@ -464,7 +465,7 @@ public ResponseBinary uploadBinary(FileBinary fileBinary, URL targetURL) throws if (!response.isSuccessful() && !getValidResponseCodes().contains(responseCode)) { String errorMessage = "Uploading file binary: Unexpected response code: " + responseCode + ". " + response.toString() + System.lineSeparator() - + "Response body: " + response.body().string() + System.lineSeparator() + + "Response body: " + Objects.requireNonNull(response.body()).string() + System.lineSeparator() + "Response headers: " + response.headers().toString() + System.lineSeparator(); throw new IOException(errorMessage); @@ -478,11 +479,12 @@ public ResponseBinary uploadBinary(FileBinary fileBinary, URL targetURL) throws // check the response content length. When downloading very large files this may exceed 4GB // we put a limit of 200MiB on the response - if (response.body().contentLength() > (1024L * 1024L * 200L)) { + final long contentLength = Objects.requireNonNull(response.body()).contentLength(); + if (contentLength > (1024L * 1024L * 200L)) { String message = String.format("Response too large. " + "Content-length = [%d]. %n" + "Response headers: %s", - response.body().contentLength(), + contentLength, response.headers().toString()); throw new IOException(message); } @@ -492,7 +494,6 @@ public ResponseBinary uploadBinary(FileBinary fileBinary, URL targetURL) throws .withApiRetryCounter(apiRetryCounter); } catch (Exception e) { catchedExceptions.add(e); - // if we get a transient error, retry the call if (RETRYABLE_EXCEPTIONS.stream().anyMatch(known -> known.isInstance(e)) || RETRYABLE_RESPONSE_CODES.contains(responseCode)) { @@ -570,7 +571,6 @@ && getTempStoragePath().getScheme().equalsIgnoreCase("gs")) { } catch (Exception e) { // remove the temp file cloudStorage.delete(blobId); - throw e; } diff --git a/src/main/java/com/cognite/client/servicesV1/executor/RequestExecutor.java b/src/main/java/com/cognite/client/servicesV1/executor/RequestExecutor.java index 9a179044..52f936c6 100644 --- a/src/main/java/com/cognite/client/servicesV1/executor/RequestExecutor.java +++ b/src/main/java/com/cognite/client/servicesV1/executor/RequestExecutor.java @@ -33,10 +33,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.*; /** * This class will execute an okhttp3 request on a separate thread and publish the result via a @@ -243,7 +240,6 @@ public ResponseBinary executeRequest(Request request) throws Exception { .withApiRetryCounter(apiRetryCounter); } catch (Exception e) { catchedExceptions.add(e); - // if we get a transient error, retry the call if (RETRYABLE_EXCEPTIONS.stream().anyMatch(known -> known.isInstance(e)) || RETRYABLE_RESPONSE_CODES.contains(responseCode)) {