From 90bf388c25d8070af53acef4b59f654646a33a4e Mon Sep 17 00:00:00 2001 From: Pavel Zubarev Date: Tue, 23 Nov 2021 00:27:11 +0100 Subject: [PATCH 1/9] Handle both cases with Async and Sync calls --- .../executor/FileBinaryRequestExecutor.java | 42 +++++++++++-------- .../servicesV1/executor/RequestExecutor.java | 12 +++--- 2 files changed, 31 insertions(+), 23 deletions(-) diff --git a/src/main/java/com/cognite/client/servicesV1/executor/FileBinaryRequestExecutor.java b/src/main/java/com/cognite/client/servicesV1/executor/FileBinaryRequestExecutor.java index d05a1906..b213a3a3 100644 --- a/src/main/java/com/cognite/client/servicesV1/executor/FileBinaryRequestExecutor.java +++ b/src/main/java/com/cognite/client/servicesV1/executor/FileBinaryRequestExecutor.java @@ -50,6 +50,7 @@ import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.concurrent.*; /** @@ -282,7 +283,7 @@ public FileBinary downloadBinary(Request request) throws Exception { if (!response.isSuccessful() && !getValidResponseCodes().contains(responseCode)) { String errorMessage = "Downloading file binary: Unexpected response code: " + responseCode + ". " + response.toString() + System.lineSeparator() - + "Response body: " + response.body().string() + System.lineSeparator() + + "Response body: " + Objects.requireNonNull(response.body()).string() + System.lineSeparator() + "Response headers: " + response.headers().toString() + System.lineSeparator(); if (responseCode >= 400 && responseCode < 500) { @@ -296,37 +297,41 @@ public FileBinary downloadBinary(Request request) throws Exception { // check the response if (response.body() == null) { throw new Exception(loggingPrefix + "Successful response, but the body is null. " - + response.toString() + System.lineSeparator() - + "Response headers: " + response.headers().toString()); + + response.toString() + System.lineSeparator() + + "Response headers: " + response.headers().toString()); } // check the content length. When downloading files >200MiB we will use temp storage. - if (response.body().contentLength() > (1024L * 1024L * 200L) || isForceTempStorage()) { + final long contentLength = Objects.requireNonNull(response.body()).contentLength(); + if (contentLength > (1024L * 1024L * 200L) || isForceTempStorage()) { if (null == getTempStoragePath()) { String message = String.format("File too large to download to memory. Consider configuring temp " + "storage on the file reader.%n" + "Content-length = [%d]. %n" + "Response headers: %s", - response.body().contentLength(), + contentLength, response.headers().toString()); throw new IOException(message); } LOG.info("Downloading {} MiB to temp storage binary.", - String.format("%.2f", response.body().contentLength() / (1024d * 1024d))); + String.format("%.2f", contentLength / (1024d * 1024d))); return downloadBinaryToTempStorage(response); } else { LOG.info("Downloading {} MiB to memory-based binary.", - String.format("%.2f", response.body().contentLength() / (1024d * 1024d))); + String.format("%.2f", contentLength / (1024d * 1024d))); return FileBinary.newBuilder() - .setBinary(ByteString.readFrom(response.body().byteStream())) - .setContentLength(response.body().contentLength()) + .setBinary(ByteString.readFrom(Objects.requireNonNull(response.body()).byteStream())) + .setContentLength(contentLength) .build(); } } catch (Exception e) { catchedExceptions.add(e); - + // depends on the execution path either of two may happen: + // 1. async call failed and cause is packed with CompleteException + // 2. sync call (small file) throws a direct exception + final Throwable cause = e instanceof CompletionException ? e.getCause() : e; // if we get a transient error, retry the call - if (RETRYABLE_EXCEPTIONS.stream().anyMatch(known -> known.isInstance(e.getClass())) + if (RETRYABLE_EXCEPTIONS.stream().anyMatch(known -> known.isInstance(cause)) || RETRYABLE_RESPONSE_CODES.contains(responseCode)) { LOG.warn(loggingPrefix + "Transient error when downloading file (" + "response code: " + responseCode @@ -464,7 +469,7 @@ public ResponseBinary uploadBinary(FileBinary fileBinary, URL targetURL) throws if (!response.isSuccessful() && !getValidResponseCodes().contains(responseCode)) { String errorMessage = "Uploading file binary: Unexpected response code: " + responseCode + ". " + response.toString() + System.lineSeparator() - + "Response body: " + response.body().string() + System.lineSeparator() + + "Response body: " + Objects.requireNonNull(response.body()).string() + System.lineSeparator() + "Response headers: " + response.headers().toString() + System.lineSeparator(); throw new IOException(errorMessage); @@ -478,11 +483,12 @@ public ResponseBinary uploadBinary(FileBinary fileBinary, URL targetURL) throws // check the response content length. When downloading very large files this may exceed 4GB // we put a limit of 200MiB on the response - if (response.body().contentLength() > (1024L * 1024L * 200L)) { + final long contentLength = Objects.requireNonNull(response.body()).contentLength(); + if (contentLength > (1024L * 1024L * 200L)) { String message = String.format("Response too large. " + "Content-length = [%d]. %n" + "Response headers: %s", - response.body().contentLength(), + contentLength, response.headers().toString()); throw new IOException(message); } @@ -492,9 +498,12 @@ public ResponseBinary uploadBinary(FileBinary fileBinary, URL targetURL) throws .withApiRetryCounter(apiRetryCounter); } catch (Exception e) { catchedExceptions.add(e); - + // depends on the execution path either of two may happen: + // 1. async call failed and cause is packed with CompleteException + // 2. sync call (small file) throws a direct exception + final Throwable cause = e instanceof CompletionException ? e.getCause() : e; // if we get a transient error, retry the call - if (RETRYABLE_EXCEPTIONS.stream().anyMatch(known -> known.isInstance(e.getClass())) + if (RETRYABLE_EXCEPTIONS.stream().anyMatch(known -> known.isInstance(cause.getClass())) || RETRYABLE_RESPONSE_CODES.contains(responseCode)) { apiRetryCounter++; LOG.warn(loggingPrefix + "Transient error when reading from Fusion (request id: " + requestId @@ -570,7 +579,6 @@ && getTempStoragePath().getScheme().equalsIgnoreCase("gs")) { } catch (Exception e) { // remove the temp file cloudStorage.delete(blobId); - throw e; } diff --git a/src/main/java/com/cognite/client/servicesV1/executor/RequestExecutor.java b/src/main/java/com/cognite/client/servicesV1/executor/RequestExecutor.java index 4b3d3b7d..3e307a93 100644 --- a/src/main/java/com/cognite/client/servicesV1/executor/RequestExecutor.java +++ b/src/main/java/com/cognite/client/servicesV1/executor/RequestExecutor.java @@ -33,10 +33,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.*; /** * This class will execute an okhttp3 request on a separate thread and publish the result via a @@ -243,9 +240,12 @@ public ResponseBinary executeRequest(Request request) throws Exception { .withApiRetryCounter(apiRetryCounter); } catch (Exception e) { catchedExceptions.add(e); - + // depends on the execution path either of two may happen: + // 1. async call failed and cause is packed with CompleteException + // 2. sync call (small file) throws a direct exception + final Throwable cause = e instanceof CompletionException ? e.getCause() : e; // if we get a transient error, retry the call - if (RETRYABLE_EXCEPTIONS.stream().anyMatch(known -> known.isInstance(e.getClass())) + if (RETRYABLE_EXCEPTIONS.stream().anyMatch(known -> known.isInstance(cause.getClass())) || RETRYABLE_RESPONSE_CODES.contains(responseCode)) { apiRetryCounter++; LOG.warn(loggingPrefix + "Transient error when reading from Fusion (request id: " + requestId From 357d8f895602aed7176673004adb4e0eb3558658 Mon Sep 17 00:00:00 2001 From: Pavel Zubarev Date: Tue, 23 Nov 2021 00:30:39 +0100 Subject: [PATCH 2/9] comment --- .../client/servicesV1/executor/FileBinaryRequestExecutor.java | 4 ++-- .../cognite/client/servicesV1/executor/RequestExecutor.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/cognite/client/servicesV1/executor/FileBinaryRequestExecutor.java b/src/main/java/com/cognite/client/servicesV1/executor/FileBinaryRequestExecutor.java index b213a3a3..927cb291 100644 --- a/src/main/java/com/cognite/client/servicesV1/executor/FileBinaryRequestExecutor.java +++ b/src/main/java/com/cognite/client/servicesV1/executor/FileBinaryRequestExecutor.java @@ -327,7 +327,7 @@ public FileBinary downloadBinary(Request request) throws Exception { } catch (Exception e) { catchedExceptions.add(e); // depends on the execution path either of two may happen: - // 1. async call failed and cause is packed with CompleteException + // 1. async call failed and cause is packed with CompletionException // 2. sync call (small file) throws a direct exception final Throwable cause = e instanceof CompletionException ? e.getCause() : e; // if we get a transient error, retry the call @@ -499,7 +499,7 @@ public ResponseBinary uploadBinary(FileBinary fileBinary, URL targetURL) throws } catch (Exception e) { catchedExceptions.add(e); // depends on the execution path either of two may happen: - // 1. async call failed and cause is packed with CompleteException + // 1. async call failed and cause is packed with CompletionException // 2. sync call (small file) throws a direct exception final Throwable cause = e instanceof CompletionException ? e.getCause() : e; // if we get a transient error, retry the call diff --git a/src/main/java/com/cognite/client/servicesV1/executor/RequestExecutor.java b/src/main/java/com/cognite/client/servicesV1/executor/RequestExecutor.java index 3e307a93..688b9f3a 100644 --- a/src/main/java/com/cognite/client/servicesV1/executor/RequestExecutor.java +++ b/src/main/java/com/cognite/client/servicesV1/executor/RequestExecutor.java @@ -241,7 +241,7 @@ public ResponseBinary executeRequest(Request request) throws Exception { } catch (Exception e) { catchedExceptions.add(e); // depends on the execution path either of two may happen: - // 1. async call failed and cause is packed with CompleteException + // 1. async call failed and cause is packed with CompletionException // 2. sync call (small file) throws a direct exception final Throwable cause = e instanceof CompletionException ? e.getCause() : e; // if we get a transient error, retry the call From 84f6fe597625286302dbbc26aa8ee60f5c9fa826 Mon Sep 17 00:00:00 2001 From: Pavel Zubarev Date: Tue, 23 Nov 2021 00:37:38 +0100 Subject: [PATCH 3/9] use isInstace correctly --- .../client/servicesV1/executor/FileBinaryRequestExecutor.java | 2 +- .../com/cognite/client/servicesV1/executor/RequestExecutor.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/cognite/client/servicesV1/executor/FileBinaryRequestExecutor.java b/src/main/java/com/cognite/client/servicesV1/executor/FileBinaryRequestExecutor.java index 927cb291..f4d22ccf 100644 --- a/src/main/java/com/cognite/client/servicesV1/executor/FileBinaryRequestExecutor.java +++ b/src/main/java/com/cognite/client/servicesV1/executor/FileBinaryRequestExecutor.java @@ -503,7 +503,7 @@ public ResponseBinary uploadBinary(FileBinary fileBinary, URL targetURL) throws // 2. sync call (small file) throws a direct exception final Throwable cause = e instanceof CompletionException ? e.getCause() : e; // if we get a transient error, retry the call - if (RETRYABLE_EXCEPTIONS.stream().anyMatch(known -> known.isInstance(cause.getClass())) + if (RETRYABLE_EXCEPTIONS.stream().anyMatch(known -> known.isInstance(cause)) || RETRYABLE_RESPONSE_CODES.contains(responseCode)) { apiRetryCounter++; LOG.warn(loggingPrefix + "Transient error when reading from Fusion (request id: " + requestId diff --git a/src/main/java/com/cognite/client/servicesV1/executor/RequestExecutor.java b/src/main/java/com/cognite/client/servicesV1/executor/RequestExecutor.java index 688b9f3a..3809fcf6 100644 --- a/src/main/java/com/cognite/client/servicesV1/executor/RequestExecutor.java +++ b/src/main/java/com/cognite/client/servicesV1/executor/RequestExecutor.java @@ -245,7 +245,7 @@ public ResponseBinary executeRequest(Request request) throws Exception { // 2. sync call (small file) throws a direct exception final Throwable cause = e instanceof CompletionException ? e.getCause() : e; // if we get a transient error, retry the call - if (RETRYABLE_EXCEPTIONS.stream().anyMatch(known -> known.isInstance(cause.getClass())) + if (RETRYABLE_EXCEPTIONS.stream().anyMatch(known -> known.isInstance(cause)) || RETRYABLE_RESPONSE_CODES.contains(responseCode)) { apiRetryCounter++; LOG.warn(loggingPrefix + "Transient error when reading from Fusion (request id: " + requestId From 3a14a435c0a7286e75a8883695ffc5c4d7dfe0bd Mon Sep 17 00:00:00 2001 From: Kjetil Halvorsen Date: Tue, 23 Nov 2021 11:25:52 +0100 Subject: [PATCH 4/9] Fix. Instance of towards instance--not class. --- src/main/java/com/cognite/client/Files.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/cognite/client/Files.java b/src/main/java/com/cognite/client/Files.java index 57da87db..11221344 100644 --- a/src/main/java/com/cognite/client/Files.java +++ b/src/main/java/com/cognite/client/Files.java @@ -533,7 +533,7 @@ public List upload(@NotNull List files, boolean del Throwable cause = e.getCause(); if (RETRYABLE_EXCEPTIONS_BINARY_UPLOAD.stream() - .anyMatch(retryable -> retryable.isInstance(cause.getClass()))) { + .anyMatch(retryable -> retryable.isInstance(cause))) { // The API is most likely saturated. Retry the uploads one file at a time. LOG.warn(batchLoggingPrefix + "Error when uploading the batch of file binaries. Will retry each file individually."); for (FileContainer file : uploadBatch) { From c7a0d8048e9d0cce70b910834d7d0978cd8e0960 Mon Sep 17 00:00:00 2001 From: Pavel Zubarev Date: Tue, 23 Nov 2021 12:20:53 +0100 Subject: [PATCH 5/9] Kjetil's comments --- src/main/java/com/cognite/client/Files.java | 190 ++++++++---------- .../executor/FileBinaryRequestExecutor.java | 13 +- .../servicesV1/executor/RequestExecutor.java | 5 +- 3 files changed, 88 insertions(+), 120 deletions(-) diff --git a/src/main/java/com/cognite/client/Files.java b/src/main/java/com/cognite/client/Files.java index 11221344..6a532011 100644 --- a/src/main/java/com/cognite/client/Files.java +++ b/src/main/java/com/cognite/client/Files.java @@ -26,6 +26,7 @@ import com.cognite.client.servicesV1.parser.ItemParser; import com.cognite.client.util.Partition; import com.google.auto.value.AutoValue; +import com.google.cloud.Tuple; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import okhttp3.internal.http2.StreamResetException; @@ -96,6 +97,11 @@ public Iterator> list() throws Exception { return this.list(Request.create()); } + public static Files create() { + return builder() + .build(); + } + /** * Returns all {@link FileMetadata} objects that matches the filters set in the {@link Request}. * @@ -202,7 +208,7 @@ public List upsert(List fileMetadataList) throws Exc } } - // Check for files with >1k assets. Set the extra assets aside so we can add them in separate updates. + // Check for files with >1k assets. Set the extra assets aside, so we can add them in separate updates. for (Long key : internalIdUpdateMap.keySet()) { FileMetadata fileMetadata = internalIdUpdateMap.get(key); if (fileMetadata.getAssetIdsCount() > 1000) { @@ -288,17 +294,7 @@ public List upsert(List fileMetadataList) throws Exc LOG.debug(loggingPrefix + "Number of missing entries reported by CDF: {}", missing.size()); // Move missing items from update to the create request - Map itemsMap = mapToId(updateResponseMap.get(response)); - for (Item value : missing) { - if (value.getIdTypeCase() == Item.IdTypeCase.EXTERNAL_ID) { - elementListCreate.add(itemsMap.get(value.getExternalId())); - itemsMap.remove(value.getExternalId()); - } else if (value.getIdTypeCase() == Item.IdTypeCase.ID) { - elementListCreate.add(itemsMap.get(String.valueOf(value.getId()))); - itemsMap.remove(String.valueOf(value.getId())); - } - } - elementListUpdate.addAll(itemsMap.values()); // Add remaining items to be re-updated + repurposeMissingItems(elementListUpdate, elementListCreate, updateResponseMap, response, missing); } } } @@ -309,7 +305,7 @@ public List upsert(List fileMetadataList) throws Exc if (elementListCreate.isEmpty()) { LOG.debug(loggingPrefix + "Create items list is empty. Skipping create."); } else { - Map, FileMetadata> createResponseMap = + Map, List> createResponseMap = splitAndCreateFileMetadata(elementListCreate, createWriter); LOG.debug(loggingPrefix + "Completed create items requests for {} items across {} batches at duration {}", elementListCreate.size(), @@ -334,17 +330,7 @@ public List upsert(List fileMetadataList) throws Exc LOG.debug(loggingPrefix + "Number of duplicate entries reported by CDF: {}", duplicates.size()); // Move duplicates from insert to the update request - Map itemsMap = mapToId(ImmutableList.of(createResponseMap.get(response))); - for (Item value : duplicates) { - if (value.getIdTypeCase() == Item.IdTypeCase.EXTERNAL_ID) { - elementListUpdate.add(itemsMap.get(value.getExternalId())); - itemsMap.remove(value.getExternalId()); - } else if (value.getIdTypeCase() == Item.IdTypeCase.ID) { - elementListUpdate.add(itemsMap.get(String.valueOf(value.getId()))); - itemsMap.remove(String.valueOf(value.getId())); - } - } - elementListCreate.addAll(itemsMap.values()); // Add remaining items to be re-inserted + repurposeMissingItems(elementListCreate, elementListUpdate, createResponseMap, response, duplicates); } } } @@ -458,6 +444,24 @@ public List upsert(List fileMetadataList) throws Exc .collect(Collectors.toList()); } + private void repurposeMissingItems(List elementListUpdate, + List elementListCreate, + Map, List> updateResponseMap, + ResponseItems response, + List missing) { + Map itemsMap = mapToId(updateResponseMap.get(response)); + for (Item value : missing) { + if (value.getIdTypeCase() == Item.IdTypeCase.EXTERNAL_ID) { + elementListCreate.add(itemsMap.get(value.getExternalId())); + itemsMap.remove(value.getExternalId()); + } else if (value.getIdTypeCase() == Item.IdTypeCase.ID) { + elementListCreate.add(itemsMap.get(String.valueOf(value.getId()))); + itemsMap.remove(String.valueOf(value.getId())); + } + } + elementListUpdate.addAll(itemsMap.values()); // Add remaining items to be re-updated + } + /** * Uploads a set of file headers and binaries to Cognite Data Fusion. * @@ -532,8 +536,7 @@ public List upload(@NotNull List files, boolean del // Must unwrap the completion exception Throwable cause = e.getCause(); - if (RETRYABLE_EXCEPTIONS_BINARY_UPLOAD.stream() - .anyMatch(retryable -> retryable.isInstance(cause))) { + if (RETRYABLE_EXCEPTIONS_BINARY_UPLOAD.stream().anyMatch(retryable -> retryable.isInstance(cause))) { // The API is most likely saturated. Retry the uploads one file at a time. LOG.warn(batchLoggingPrefix + "Error when uploading the batch of file binaries. Will retry each file individually."); for (FileContainer file : uploadBatch) { @@ -786,8 +789,8 @@ public List downloadFileBinaries(List fileItems, } LOG.debug(loggingPrefix + "Removing duplicates and missing items and retrying the request"); List duplicates = ItemParser.parseItems(responseBatch.get(0).getDuplicateItems()); - List missing = new ArrayList(); // Must define this as an explicit List for it to be mutable - missing.addAll(ItemParser.parseItems(responseBatch.get(0).getMissingItems())); + // Must define this as an explicit List for it to be mutable + List missing = new ArrayList(ItemParser.parseItems(responseBatch.get(0).getMissingItems())); LOG.debug(loggingPrefix + "No of duplicates reported: {}", duplicates.size()); LOG.debug(loggingPrefix + "No of missing items reported: {}", missing.size()); @@ -987,32 +990,23 @@ private Map>, List> splitAndDownloadFileBin return responseMap; } - /** - * Update file metadata items. - * - * Submits a (large) batch of items by splitting it up into multiple, parallel create / insert requests. - * The response from each request is returned along with the items used as input. - * - * @param fileMetadataList the objects to create/insert. - * @param updateWriter the ItemWriter to use for sending update requests - * @return a {@link Map} with the responses and request inputs. - * @throws Exception - */ - private Map, List> splitAndUpdateFileMetadata(List fileMetadataList, - ConnectorServiceV1.ItemWriter updateWriter) throws Exception { + private Map, List> splitAndAct( + List fileMetadataList, + ConnectorServiceV1.ItemWriter updateWriter, + FileAction action + ) throws Exception { + Map>, List> responseMap = new HashMap<>(); List> batches = Partition.ofSize(fileMetadataList, MAX_WRITE_REQUEST_BATCH_SIZE); // Submit all batches for (List fileBatch : batches) { - responseMap.put(updateFileMetadata(fileBatch, updateWriter), fileBatch); + responseMap.put(action.apply(fileBatch, updateWriter), fileBatch); } // Wait for all requests futures to complete - List>> futureList = new ArrayList<>(); - responseMap.keySet().forEach(future -> futureList.add(future)); CompletableFuture allFutures = - CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()])); + CompletableFuture.allOf(responseMap.keySet().toArray(new CompletableFuture[0])); allFutures.join(); // Wait for all futures to complete // Collect the responses from the futures @@ -1025,40 +1019,37 @@ private Map, List> splitAndUpdateFileMetadat } /** - * Adds asset ids to existing file metadata objects. + * Update file metadata items. * * Submits a (large) batch of items by splitting it up into multiple, parallel create / insert requests. * The response from each request is returned along with the items used as input. * * @param fileMetadataList the objects to create/insert. - * @param updateWriter the ItemWriter to use for sending update requests + * @param updateWriter the ItemWriter to use for sending update requests * @return a {@link Map} with the responses and request inputs. * @throws Exception */ - private Map, List> splitAndAddAssets(List fileMetadataList, - ConnectorServiceV1.ItemWriter updateWriter) throws Exception { - Map>, List> responseMap = new HashMap<>(); - List> batches = Partition.ofSize(fileMetadataList, MAX_WRITE_REQUEST_BATCH_SIZE); - - // Submit all batches - for (List fileBatch : batches) { - responseMap.put(addFileAssets(fileBatch, updateWriter), fileBatch); - } - - // Wait for all requests futures to complete - List>> futureList = new ArrayList<>(); - responseMap.keySet().forEach(future -> futureList.add(future)); - CompletableFuture allFutures = - CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()])); - allFutures.join(); // Wait for all futures to complete - - // Collect the responses from the futures - Map, List> resultsMap = new HashMap<>(responseMap.size()); - for (Map.Entry>, List> entry : responseMap.entrySet()) { - resultsMap.put(entry.getKey().join(), entry.getValue()); - } + private Map, List> splitAndUpdateFileMetadata( + List fileMetadataList, + ConnectorServiceV1.ItemWriter updateWriter) throws Exception { + return splitAndAct(fileMetadataList, updateWriter, this::updateFileMetadata); + } - return resultsMap; + /** + * Adds asset ids to existing file metadata objects. + * + * Submits a (large) batch of items by splitting it up into multiple, parallel create / insert requests. + * The response from each request is returned along with the items used as input. + * + * @param fileMetadataList the objects to create/insert. + * @param updateWriter the ItemWriter to use for sending update requests + * @return a {@link Map} with the responses and request inputs. + * @throws Exception + */ + private Map, List> splitAndAddAssets( + List fileMetadataList, + ConnectorServiceV1.ItemWriter updateWriter) throws Exception { + return splitAndAct(fileMetadataList, updateWriter, this::addFileAssets); } /** @@ -1072,29 +1063,10 @@ private Map, List> splitAndAddAssets(List, FileMetadata> splitAndCreateFileMetadata(List fileMetadataList, - ConnectorServiceV1.ItemWriter createWriter) throws Exception { - Map>, FileMetadata> responseMap = new HashMap<>(); - - // Submit all batches - for (FileMetadata file : fileMetadataList) { - responseMap.put(createFileMetadata(file, createWriter), file); - } - - // Wait for all requests futures to complete - List>> futureList = new ArrayList<>(); - responseMap.keySet().forEach(future -> futureList.add(future)); - CompletableFuture allFutures = - CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()])); - allFutures.join(); // Wait for all futures to complete - - // Collect the responses from the futures - Map, FileMetadata> resultsMap = new HashMap<>(responseMap.size()); - for (Map.Entry>, FileMetadata> entry : responseMap.entrySet()) { - resultsMap.put(entry.getKey().join(), entry.getValue()); - } - - return resultsMap; + private Map, List> splitAndCreateFileMetadata( + List fileMetadataList, + ConnectorServiceV1.ItemWriter createWriter) throws Exception { + return splitAndAct(fileMetadataList, createWriter, this::createFileMetadata); } /** @@ -1105,24 +1077,34 @@ private Map, FileMetadata> splitAndCreateFileMetadata(List * split the input into multiple batches. If you have a large batch of {@link FileMetadata} that * you would like to split across multiple requests, use the {@code splitAndCreateFileMetadata} method. * - * @param fileMetadata + * @param filesBatch * @param fileWriter * @return * @throws Exception */ - private CompletableFuture> createFileMetadata(FileMetadata fileMetadata, - ConnectorServiceV1.ItemWriter fileWriter) throws Exception { + private CompletableFuture> createFileMetadata(Collection filesBatch, + ConnectorServiceV1.ItemWriter fileWriter) throws Exception { String loggingPrefix = "createFileMetadata() - "; LOG.debug(loggingPrefix + "Received file metadata item / header to create."); + ImmutableList.Builder> insertItemsBuilder = ImmutableList.builder(); + for (FileMetadata fileMetadata : filesBatch) { + // build request object + insertItemsBuilder.add(toRequestInsertItem(fileMetadata)); + } // build request object Request postSeqBody = addAuthInfo(Request.create() - .withRequestParameters(toRequestInsertItem(fileMetadata))); + .withItems(insertItemsBuilder.build())); // post write request return fileWriter.writeItemsAsync(postSeqBody); } + @FunctionalInterface + public interface FileAction { + CompletableFuture> apply(Collection c, ConnectorServiceV1.ItemWriter w) throws Exception; + } + /** * Post a collection of {@link FileMetadata} update request on a separate thread. The response is wrapped in a * {@link CompletableFuture} that is returned immediately to the caller. @@ -1196,17 +1178,9 @@ private CompletableFuture> addFileAssets(Collection mapToId(List fileMetadataList) { - Map idMap = new HashMap<>(); - for (FileMetadata fileMetadata : fileMetadataList) { - if (fileMetadata.hasExternalId()) { - idMap.put(fileMetadata.getExternalId(), fileMetadata); - } else if (fileMetadata.hasId()) { - idMap.put(String.valueOf(fileMetadata.getId()), fileMetadata); - } else { - idMap.put("", fileMetadata); - } - } - return idMap; + return fileMetadataList.stream() + .map(fileMetadata -> Tuple.of(getFileId(fileMetadata).orElse(""), fileMetadata)) + .collect(Collectors.toMap(Tuple::x, Tuple::y)); } /* diff --git a/src/main/java/com/cognite/client/servicesV1/executor/FileBinaryRequestExecutor.java b/src/main/java/com/cognite/client/servicesV1/executor/FileBinaryRequestExecutor.java index f4d22ccf..d201be89 100644 --- a/src/main/java/com/cognite/client/servicesV1/executor/FileBinaryRequestExecutor.java +++ b/src/main/java/com/cognite/client/servicesV1/executor/FileBinaryRequestExecutor.java @@ -326,12 +326,8 @@ public FileBinary downloadBinary(Request request) throws Exception { } } catch (Exception e) { catchedExceptions.add(e); - // depends on the execution path either of two may happen: - // 1. async call failed and cause is packed with CompletionException - // 2. sync call (small file) throws a direct exception - final Throwable cause = e instanceof CompletionException ? e.getCause() : e; // if we get a transient error, retry the call - if (RETRYABLE_EXCEPTIONS.stream().anyMatch(known -> known.isInstance(cause)) + if (RETRYABLE_EXCEPTIONS.stream().anyMatch(known -> known.isInstance(e)) || RETRYABLE_RESPONSE_CODES.contains(responseCode)) { LOG.warn(loggingPrefix + "Transient error when downloading file (" + "response code: " + responseCode @@ -499,11 +495,10 @@ public ResponseBinary uploadBinary(FileBinary fileBinary, URL targetURL) throws } catch (Exception e) { catchedExceptions.add(e); // depends on the execution path either of two may happen: - // 1. async call failed and cause is packed with CompletionException - // 2. sync call (small file) throws a direct exception - final Throwable cause = e instanceof CompletionException ? e.getCause() : e; + // 1. async call failed and cause is packed with CompletionException (will be handled outside) + // 2. sync call (small file) throws a direct exception should be handled here. // if we get a transient error, retry the call - if (RETRYABLE_EXCEPTIONS.stream().anyMatch(known -> known.isInstance(cause)) + if (RETRYABLE_EXCEPTIONS.stream().anyMatch(known -> known.isInstance(e)) || RETRYABLE_RESPONSE_CODES.contains(responseCode)) { apiRetryCounter++; LOG.warn(loggingPrefix + "Transient error when reading from Fusion (request id: " + requestId diff --git a/src/main/java/com/cognite/client/servicesV1/executor/RequestExecutor.java b/src/main/java/com/cognite/client/servicesV1/executor/RequestExecutor.java index 3809fcf6..f7978ebc 100644 --- a/src/main/java/com/cognite/client/servicesV1/executor/RequestExecutor.java +++ b/src/main/java/com/cognite/client/servicesV1/executor/RequestExecutor.java @@ -241,11 +241,10 @@ public ResponseBinary executeRequest(Request request) throws Exception { } catch (Exception e) { catchedExceptions.add(e); // depends on the execution path either of two may happen: - // 1. async call failed and cause is packed with CompletionException + // 1. async call failed and cause is packed with CompletionException (will be handled outside) // 2. sync call (small file) throws a direct exception - final Throwable cause = e instanceof CompletionException ? e.getCause() : e; // if we get a transient error, retry the call - if (RETRYABLE_EXCEPTIONS.stream().anyMatch(known -> known.isInstance(cause)) + if (RETRYABLE_EXCEPTIONS.stream().anyMatch(known -> known.isInstance(e)) || RETRYABLE_RESPONSE_CODES.contains(responseCode)) { apiRetryCounter++; LOG.warn(loggingPrefix + "Transient error when reading from Fusion (request id: " + requestId From 0afa7ddb260c0251a8c5e521470c33cc41afdfa0 Mon Sep 17 00:00:00 2001 From: Pavel Zubarev Date: Tue, 23 Nov 2021 14:27:53 +0100 Subject: [PATCH 6/9] Fix file creation --- src/main/java/com/cognite/client/Files.java | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/cognite/client/Files.java b/src/main/java/com/cognite/client/Files.java index 6a532011..da678a45 100644 --- a/src/main/java/com/cognite/client/Files.java +++ b/src/main/java/com/cognite/client/Files.java @@ -1066,7 +1066,13 @@ private Map, List> splitAndAddAssets( private Map, List> splitAndCreateFileMetadata( List fileMetadataList, ConnectorServiceV1.ItemWriter createWriter) throws Exception { - return splitAndAct(fileMetadataList, createWriter, this::createFileMetadata); + + // files API doesn't have batching support and accept single item at a time + Map, List> output = new HashMap<>(); + for(FileMetadata fileMetadata: fileMetadataList) { + output.putAll(splitAndAct(ImmutableList.of(fileMetadata), createWriter, this::createFileMetadata)); + } + return output; } /** @@ -1086,17 +1092,13 @@ private CompletableFuture> createFileMetadata(Collection> insertItemsBuilder = ImmutableList.builder(); - for (FileMetadata fileMetadata : filesBatch) { - // build request object - insertItemsBuilder.add(toRequestInsertItem(fileMetadata)); - } + // files endpoint doesn't support batching + Preconditions.checkArgument(filesBatch.size() == 1); // build request object + FileMetadata fileMetadata = filesBatch.stream().findFirst().orElseThrow(); Request postSeqBody = addAuthInfo(Request.create() - .withItems(insertItemsBuilder.build())); + .withRequestParameters(toRequestInsertItem(fileMetadata))); - // post write request return fileWriter.writeItemsAsync(postSeqBody); } From c221c8b287884c957069cd397cc8ff9c734565f7 Mon Sep 17 00:00:00 2001 From: Pavel Zubarev Date: Tue, 23 Nov 2021 21:25:39 +0100 Subject: [PATCH 7/9] Refactor files metadata upsert --- src/main/java/com/cognite/client/Files.java | 128 ++++-------------- .../client/servicesV1/ConnectorServiceV1.java | 2 +- 2 files changed, 25 insertions(+), 105 deletions(-) diff --git a/src/main/java/com/cognite/client/Files.java b/src/main/java/com/cognite/client/Files.java index da678a45..4ae91f79 100644 --- a/src/main/java/com/cognite/client/Files.java +++ b/src/main/java/com/cognite/client/Files.java @@ -29,6 +29,7 @@ import com.google.cloud.Tuple; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import okhttp3.internal.http2.StreamResetException; import org.apache.commons.lang3.RandomStringUtils; import org.jetbrains.annotations.NotNull; @@ -193,57 +194,19 @@ public List upsert(List fileMetadataList) throws Exc ConnectorServiceV1.ItemWriter updateWriter = getClient().getConnectorService().updateFileHeaders(); ConnectorServiceV1.ItemWriter createWriter = getClient().getConnectorService().writeFileHeaders(); - // naive de-duplication based on ids - Map internalIdUpdateMap = new HashMap<>(1000); - Map externalIdUpdateMap = new HashMap<>(1000); - Map internalIdAssetsMap = new HashMap<>(50); - Map externalIdAssetsMap = new HashMap<>(50); - for (FileMetadata value : fileMetadataList) { - if (value.hasExternalId()) { - externalIdUpdateMap.put(value.getExternalId(), value); - } else if (value.hasId()) { - internalIdUpdateMap.put(value.getId(), value); - } else { - throw new Exception("File metadata item does not contain id nor externalId: " + value.toString()); - } - } - - // Check for files with >1k assets. Set the extra assets aside, so we can add them in separate updates. - for (Long key : internalIdUpdateMap.keySet()) { - FileMetadata fileMetadata = internalIdUpdateMap.get(key); - if (fileMetadata.getAssetIdsCount() > 1000) { - internalIdUpdateMap.put(key, fileMetadata.toBuilder() - .clearAssetIds() - .addAllAssetIds(fileMetadata.getAssetIdsList().subList(0,1000)) - .build()); - internalIdAssetsMap.put(key, FileMetadata.newBuilder() - .setId(fileMetadata.getId()) - .addAllAssetIds(fileMetadata.getAssetIdsList().subList(1000, fileMetadata.getAssetIdsList().size())) - .build()); - } - } - for (String key : externalIdUpdateMap.keySet()) { - FileMetadata fileMetadata = externalIdUpdateMap.get(key); - if (fileMetadata.getAssetIdsCount() > 1000) { - externalIdUpdateMap.put(key, fileMetadata.toBuilder() - .clearAssetIds() - .addAllAssetIds(fileMetadata.getAssetIdsList().subList(0,1000)) - .build()); - externalIdAssetsMap.put(key, FileMetadata.newBuilder() - .setExternalId(fileMetadata.getExternalId()) - .addAllAssetIds(fileMetadata.getAssetIdsList().subList(1000, fileMetadata.getAssetIdsList().size())) - .build()); - } - } + // mapToId eliminates duplicates + final Collection uniqueFilesMetadata = mapToId(fileMetadataList).values(); // Combine the input into list - List elementListUpdate = new ArrayList<>(); + List elementListUpdate = uniqueFilesMetadata.stream() + .map(fileHeader -> fileHeader.toBuilder() + .clearAssetIds() + .addAllAssetIds(fileHeader.getAssetIdsList().subList(0, 1000)) + .build()) + .collect(Collectors.toList()); List elementListCreate = new ArrayList<>(); List elementListCompleted = new ArrayList<>(); - elementListUpdate.addAll(externalIdUpdateMap.values()); - elementListUpdate.addAll(internalIdUpdateMap.values()); - /* The upsert loop. If there are items left to insert or update: 1. Update elements @@ -342,63 +305,22 @@ public List upsert(List fileMetadataList) throws Exc file create/update (all the code above) and subsequent update requests which add the remaining assetIds (code below). */ - Map internalIdTempMap = new HashMap<>(); - Map externalIdTempMap = new HashMap<>(); - List elementListAssetUpdate = new ArrayList<>(); - while (internalIdAssetsMap.size() > 0 || externalIdAssetsMap.size() > 0) { + final List elementListAssetUpdate = uniqueFilesMetadata.stream() + .parallel() + .filter(fileMetadata -> fileMetadata.getAssetIdsList().size() > 1000) + .flatMap(fileMetadata -> Lists.partition(fileMetadata.getAssetIdsList(), 1000).stream() + .skip(1) // first 1000 was written already + .map(collection -> fileMetadata.toBuilder() + .clearAssetIds() + .addAllAssetIds(collection) + .build()) + ) + .collect(Collectors.toList()); + + while (elementListAssetUpdate.size() > 0) { LOG.info(loggingPrefix + "Some files have very high assetId cardinality (+1k). Adding assetId to " - + (internalIdAssetsMap.size() + externalIdAssetsMap.size()) + + elementListAssetUpdate.size() + " file(s)."); - internalIdUpdateMap.clear(); - externalIdUpdateMap.clear(); - internalIdTempMap.clear(); - externalIdTempMap.clear(); - - // Check for files with >1k remaining assets - for (Long key : internalIdAssetsMap.keySet()) { - FileMetadata fileMetadata = internalIdAssetsMap.get(key); - if (fileMetadata.getAssetIdsCount() > 1000) { - internalIdUpdateMap.put(key, fileMetadata.toBuilder() - .clearAssetIds() - .addAllAssetIds(fileMetadata.getAssetIdsList().subList(0,1000)) - .build()); - internalIdTempMap.put(key, FileMetadata.newBuilder() - .setId(fileMetadata.getId()) - .addAllAssetIds(fileMetadata.getAssetIdsList().subList(1000, fileMetadata.getAssetIdsList().size())) - .build()); - } else { - // The entire assetId list can be pushed in a single update - internalIdUpdateMap.put(key, fileMetadata); - } - } - internalIdAssetsMap.clear(); - internalIdAssetsMap.putAll(internalIdTempMap); - - for (String key : externalIdAssetsMap.keySet()) { - FileMetadata fileMetadata = externalIdAssetsMap.get(key); - if (fileMetadata.getAssetIdsCount() > 1000) { - externalIdUpdateMap.put(key, fileMetadata.toBuilder() - .clearAssetIds() - .addAllAssetIds(fileMetadata.getAssetIdsList().subList(0,1000)) - .build()); - externalIdTempMap.put(key, FileMetadata.newBuilder() - .setExternalId(fileMetadata.getExternalId()) - .addAllAssetIds(fileMetadata.getAssetIdsList().subList(1000, fileMetadata.getAssetIdsList().size())) - .build()); - } else { - // The entire assetId list can be pushed in a single update - externalIdUpdateMap.put(key, fileMetadata); - } - } - externalIdAssetsMap.clear(); - externalIdAssetsMap.putAll(externalIdTempMap); - - // prepare the update and send request - LOG.info(loggingPrefix + "Building update request to add assetIds for {} files.", - internalIdUpdateMap.size() + externalIdUpdateMap.size()); - elementListAssetUpdate.clear(); - elementListAssetUpdate.addAll(externalIdUpdateMap.values()); - elementListAssetUpdate.addAll(internalIdUpdateMap.values()); // should not happen, but need to check if (elementListAssetUpdate.isEmpty()) { @@ -437,8 +359,6 @@ public List upsert(List fileMetadataList) throws Exc elementListCompleted.size())); } - - return elementListCompleted.stream() .map(this::parseFileMetadata) .collect(Collectors.toList()); @@ -1182,7 +1102,7 @@ private CompletableFuture> addFileAssets(Collection mapToId(List fileMetadataList) { return fileMetadataList.stream() .map(fileMetadata -> Tuple.of(getFileId(fileMetadata).orElse(""), fileMetadata)) - .collect(Collectors.toMap(Tuple::x, Tuple::y)); + .collect(Collectors.toMap(Tuple::x, Tuple::y, (o1, o2) -> o1)); // remove duplicates by picking random } /* diff --git a/src/main/java/com/cognite/client/servicesV1/ConnectorServiceV1.java b/src/main/java/com/cognite/client/servicesV1/ConnectorServiceV1.java index 71b6aec9..3b16a97c 100644 --- a/src/main/java/com/cognite/client/servicesV1/ConnectorServiceV1.java +++ b/src/main/java/com/cognite/client/servicesV1/ConnectorServiceV1.java @@ -1049,7 +1049,7 @@ public ItemWriter writeFileHeaders() { FilesUploadRequestProvider requestProvider = FilesUploadRequestProvider.builder() .setEndpoint("files") - .setRequest(Request.create()) + .setRequest(Request.create().withRootParameter("overwrite", true)) .setSdkIdentifier(getClient().getClientConfig().getSdkIdentifier()) .setAppIdentifier(getClient().getClientConfig().getAppIdentifier()) .setSessionIdentifier(getClient().getClientConfig().getSessionIdentifier()) From c16ed6a9eebdc7bd224efc2d82ce8b065ced760b Mon Sep 17 00:00:00 2001 From: Pavel Zubarev Date: Tue, 23 Nov 2021 21:29:08 +0100 Subject: [PATCH 8/9] remove redundant comments --- .../client/servicesV1/executor/FileBinaryRequestExecutor.java | 3 --- .../cognite/client/servicesV1/executor/RequestExecutor.java | 3 --- 2 files changed, 6 deletions(-) diff --git a/src/main/java/com/cognite/client/servicesV1/executor/FileBinaryRequestExecutor.java b/src/main/java/com/cognite/client/servicesV1/executor/FileBinaryRequestExecutor.java index d201be89..e9a3ef2a 100644 --- a/src/main/java/com/cognite/client/servicesV1/executor/FileBinaryRequestExecutor.java +++ b/src/main/java/com/cognite/client/servicesV1/executor/FileBinaryRequestExecutor.java @@ -494,9 +494,6 @@ public ResponseBinary uploadBinary(FileBinary fileBinary, URL targetURL) throws .withApiRetryCounter(apiRetryCounter); } catch (Exception e) { catchedExceptions.add(e); - // depends on the execution path either of two may happen: - // 1. async call failed and cause is packed with CompletionException (will be handled outside) - // 2. sync call (small file) throws a direct exception should be handled here. // if we get a transient error, retry the call if (RETRYABLE_EXCEPTIONS.stream().anyMatch(known -> known.isInstance(e)) || RETRYABLE_RESPONSE_CODES.contains(responseCode)) { diff --git a/src/main/java/com/cognite/client/servicesV1/executor/RequestExecutor.java b/src/main/java/com/cognite/client/servicesV1/executor/RequestExecutor.java index f7978ebc..52f936c6 100644 --- a/src/main/java/com/cognite/client/servicesV1/executor/RequestExecutor.java +++ b/src/main/java/com/cognite/client/servicesV1/executor/RequestExecutor.java @@ -240,9 +240,6 @@ public ResponseBinary executeRequest(Request request) throws Exception { .withApiRetryCounter(apiRetryCounter); } catch (Exception e) { catchedExceptions.add(e); - // depends on the execution path either of two may happen: - // 1. async call failed and cause is packed with CompletionException (will be handled outside) - // 2. sync call (small file) throws a direct exception // if we get a transient error, retry the call if (RETRYABLE_EXCEPTIONS.stream().anyMatch(known -> known.isInstance(e)) || RETRYABLE_RESPONSE_CODES.contains(responseCode)) { From 68f2e29784ea8dea559f6640a3560e0fe536478a Mon Sep 17 00:00:00 2001 From: Pavel Zubarev Date: Wed, 24 Nov 2021 09:52:28 +0100 Subject: [PATCH 9/9] don't use while --- src/main/java/com/cognite/client/Files.java | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/main/java/com/cognite/client/Files.java b/src/main/java/com/cognite/client/Files.java index 4ae91f79..e03f29dc 100644 --- a/src/main/java/com/cognite/client/Files.java +++ b/src/main/java/com/cognite/client/Files.java @@ -317,18 +317,11 @@ public List upsert(List fileMetadataList) throws Exc ) .collect(Collectors.toList()); - while (elementListAssetUpdate.size() > 0) { + if (elementListAssetUpdate.size() > 0) { LOG.info(loggingPrefix + "Some files have very high assetId cardinality (+1k). Adding assetId to " + elementListAssetUpdate.size() + " file(s)."); - // should not happen, but need to check - if (elementListAssetUpdate.isEmpty()) { - String message = loggingPrefix + "Internal error. Not able to send assetId update. The payload is empty."; - LOG.error(message); - throw new Exception(message); - } - Map, List> responseItemsAssets = splitAndAddAssets(elementListAssetUpdate, updateWriter); for (ResponseItems responseItems : responseItemsAssets.keySet()) {