From f610995867ba5218785f63f95ae74d6d2a73d79e Mon Sep 17 00:00:00 2001 From: ssekaran Date: Tue, 2 Dec 2025 15:05:22 -0800 Subject: [PATCH 01/12] Fixing file descriptor leak after file streamed in Sidecar Client --- .../sidecar/client/VertxHttpClient.java | 33 +++- .../sidecar/client/VertxHttpClientTest.java | 162 +++++++++++++++++- 2 files changed, 185 insertions(+), 10 deletions(-) diff --git a/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java b/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java index f4c3890df..939ba8172 100644 --- a/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java +++ b/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java @@ -57,6 +57,7 @@ import org.apache.cassandra.sidecar.common.request.Request; import org.apache.cassandra.sidecar.common.request.UploadableRequest; +import static java.lang.String.valueOf; import static org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.AUTH_ROLE; import static org.apache.cassandra.sidecar.common.utils.StringUtils.isNullOrEmpty; @@ -165,11 +166,7 @@ protected CompletableFuture executeUploadFileInternal(SidecarInsta Promise promise = Promise.promise(); // open the local file openFileForRead(vertx.fileSystem(), filename) - .compose(pair -> vertxRequest.ssl(config.ssl()) - .putHeader(HttpHeaderNames.CONTENT_LENGTH.toString(), - String.valueOf(pair.getKey())) - .sendStream(pair.getValue() - .setReadBufferSize(config.sendReadBufferSize()))) + .compose(pair -> sendFileStream(vertxRequest, pair, filename)) .onFailure(promise::fail) .onSuccess(response -> { byte[] raw = response.body() != null ? response.body().getBytes() : null; @@ -184,6 +181,32 @@ protected CompletableFuture executeUploadFileInternal(SidecarInsta return promise.future().toCompletionStage().toCompletableFuture(); } + /** + * Sends the file stream via HTTP request. + * + * @param vertxRequest the HTTP request to send the file stream with + * @param pair a pair containing file size and the AsyncFile handle + * @param filename the name of the file being uploaded (for logging purposes) + * @return a Future that completes when the file has been sent + */ + protected Future> sendFileStream( + HttpRequest vertxRequest, + AbstractMap.SimpleEntry pair, + String filename) + { + AsyncFile asyncFile = pair.getValue(); + return vertxRequest.ssl(config.ssl()) + .putHeader(HttpHeaderNames.CONTENT_LENGTH.toString(), + valueOf(pair.getKey())) + .sendStream(pair.getValue() + .setReadBufferSize(config.sendReadBufferSize())) + .onComplete(ar -> { + asyncFile.close().onFailure(err -> + LOGGER.warn("Failed to close file after upload: filename='{}'", filename, err) + ); + }); + } + /** * {@inheritDoc} */ diff --git a/analytics-sidecar-vertx-client/src/test/java/org/apache/cassandra/sidecar/client/VertxHttpClientTest.java b/analytics-sidecar-vertx-client/src/test/java/org/apache/cassandra/sidecar/client/VertxHttpClientTest.java index 8d8d0952d..021521549 100644 --- a/analytics-sidecar-vertx-client/src/test/java/org/apache/cassandra/sidecar/client/VertxHttpClientTest.java +++ b/analytics-sidecar-vertx-client/src/test/java/org/apache/cassandra/sidecar/client/VertxHttpClientTest.java @@ -18,16 +18,38 @@ package org.apache.cassandra.sidecar.client; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.AbstractMap.SimpleEntry; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; +import io.vertx.core.file.AsyncFile; import io.vertx.ext.web.client.HttpRequest; +import io.vertx.ext.web.client.HttpResponse; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.apache.cassandra.sidecar.client.request.RequestExecutorTest; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static java.nio.file.Files.copy; import static org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.AUTH_ROLE; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -37,17 +59,35 @@ public class VertxHttpClientTest { private static Vertx vertx; + private MockWebServer mockServer; + private HttpClientConfig config; + private SidecarInstanceImpl sidecarInstance; - @BeforeAll - public static void setUp() + @BeforeEach + public void setUp() throws IOException { vertx = Vertx.vertx(); + mockServer = new MockWebServer(); + mockServer.start(); + + config = new HttpClientConfig.Builder<>() + .ssl(false) + .timeoutMillis(30000) + .build(); + sidecarInstance = RequestExecutorTest.newSidecarInstance(mockServer); } - @AfterAll - public static void tearDown() + @AfterEach + public void tearDown() throws IOException { - vertx.close(); + if (mockServer != null) + { + mockServer.shutdown(); + } + if (vertx != null) + { + vertx.close(); + } } @Test @@ -74,4 +114,116 @@ private HttpClientConfig.Builder httpClientConfigBuilder() .timeoutMillis(100) .idleTimeoutMillis(100); } + + @Test + void testUploadSSTableClosesFile(@TempDir Path tempDirectory) throws Exception + { + runTestScenario(tempDirectory, + new MockResponse().setResponseCode(OK.code()), + new ExposeAsyncFileVertxHttpClient(vertx, config)); + } + + @Test + void testUploadClosesFileOnUploadFailure(@TempDir Path tempDirectory) throws Exception + { + runTestScenario(tempDirectory, + new MockResponse().setResponseCode(INTERNAL_SERVER_ERROR.code()), + new ExposeAsyncFileVertxHttpClient(vertx, config)); + } + + @Test + void testMultipleUploadClosesAllFiles(@TempDir Path tempDirectory) throws Exception + { + mockServer.enqueue(new MockResponse().setResponseCode(OK.code())); + mockServer.enqueue(new MockResponse().setResponseCode(OK.code())); + mockServer.enqueue(new MockResponse().setResponseCode(OK.code())); + + Path fileToUpload = prepareFile(tempDirectory); + + ExposeAsyncFileVertxHttpClient httpClient = new ExposeAsyncFileVertxHttpClient(vertx, config); + + // Upload the same file 3 times (simulating multiple file uploads) + for (int i = 0; i < 3; i++) + { + HttpRequest vertxRequest = httpClient.webClient.put(mockServer.getPort(), + mockServer.getHostName(), + "/upload/test" + i); + httpClient.executeUploadFileInternal(sidecarInstance, vertxRequest, fileToUpload.toString()) + .get(30, TimeUnit.SECONDS); + } + + assertThat(mockServer.getRequestCount()).isEqualTo(3); + assertThat(httpClient.capturedFiles).hasSize(3); + + // Give async file close operations time to complete + Thread.sleep(100); + + // Verify all the files are closed by attempting to call .end() which should throw IllegalStateException + for (AsyncFile file : httpClient.capturedFiles) + { + assertThatThrownBy(file::end) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("File handle is closed" ); + } + } + + private void runTestScenario(Path tempDirectory, + MockResponse mockResponse, + ExposeAsyncFileVertxHttpClient httpClient) throws Exception + { + mockServer.enqueue(mockResponse); + + Path fileToUpload = prepareFile(tempDirectory); + HttpRequest vertxRequest = httpClient.webClient.put(mockServer.getPort(), + mockServer.getHostName(), + "/upload/test" ); + + httpClient.executeUploadFileInternal(sidecarInstance, vertxRequest, fileToUpload.toString()) + .get(30, TimeUnit.SECONDS); + + assertThat(mockServer.getRequestCount()).isEqualTo(1); + + // Give async file close operation time to complete + Thread.sleep(100); + + // Verify file is closed by attempting to call .end() which should throw IllegalStateException + assertThat(httpClient.capturedFiles).hasSize(1); + assertThatThrownBy(() -> httpClient.capturedFiles.get(0).end()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("File handle is closed" ); + } + + /** + * Class that extends from {@link VertxHttpClient} for testing purposes and holds a reference to the + * {@link AsyncFile} to ensure that the file has been closed. + */ + static class ExposeAsyncFileVertxHttpClient extends VertxHttpClient + { + List capturedFiles = new ArrayList<>(); + + ExposeAsyncFileVertxHttpClient(Vertx vertx, HttpClientConfig config) + { + super(vertx, config); + } + + @Override + protected Future> sendFileStream(HttpRequest vertxRequest, + SimpleEntry pair, + String filename) + { + capturedFiles.add(pair.getValue()); + return super.sendFileStream(vertxRequest, pair, filename); + } + } + + private Path prepareFile(Path tempDirectory) throws IOException + { + Path fileToUpload = tempDirectory.resolve("nb-1-big-TOC.txt" ); + try (InputStream inputStream = getClass().getClassLoader().getResourceAsStream("sstables/nb-1-big-TOC.txt" )) + { + assertThat(inputStream).isNotNull(); + copy(inputStream, fileToUpload, StandardCopyOption.REPLACE_EXISTING); + } + return fileToUpload; + } } From 9d9d58cd048fbfac98091461c43ef9b03baebc81 Mon Sep 17 00:00:00 2001 From: ssekaran Date: Tue, 2 Dec 2025 15:59:22 -0800 Subject: [PATCH 02/12] changes.txt update --- CHANGES.txt | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGES.txt b/CHANGES.txt index 56c088f42..94652903e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,6 +1,10 @@ -0.2.0 +0.3.0 ----- + * File descriptor leak after file streamed in Sidecar Client (CASSANALYTICS-103) * Add TimeRangeFilter to filter out SSTables outside given time window (CASSANALYTICS-102) + +0.2.0 +----- * Generated distribution artifacts fix (CASSANALYTICS-105) * Fix SSTable descriptor mismatch preventing newly produced SSTables from being uploaded (CASSANALYTICS-98) * Expose SidecarCdc builders and interfaces (CASSANALYTICS-94) From 00adac1f9bbbe8cc35e14dfef9456e973704a561 Mon Sep 17 00:00:00 2001 From: ssekaran Date: Tue, 2 Dec 2025 16:52:44 -0800 Subject: [PATCH 03/12] Formatting --- .../sidecar/client/VertxHttpClientTest.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/analytics-sidecar-vertx-client/src/test/java/org/apache/cassandra/sidecar/client/VertxHttpClientTest.java b/analytics-sidecar-vertx-client/src/test/java/org/apache/cassandra/sidecar/client/VertxHttpClientTest.java index 021521549..8296952b1 100644 --- a/analytics-sidecar-vertx-client/src/test/java/org/apache/cassandra/sidecar/client/VertxHttpClientTest.java +++ b/analytics-sidecar-vertx-client/src/test/java/org/apache/cassandra/sidecar/client/VertxHttpClientTest.java @@ -27,9 +27,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; -import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -163,7 +161,7 @@ void testMultipleUploadClosesAllFiles(@TempDir Path tempDirectory) throws Except { assertThatThrownBy(file::end) .isInstanceOf(IllegalStateException.class) - .hasMessageContaining("File handle is closed" ); + .hasMessageContaining("File handle is closed"); } } @@ -176,7 +174,7 @@ private void runTestScenario(Path tempDirectory, Path fileToUpload = prepareFile(tempDirectory); HttpRequest vertxRequest = httpClient.webClient.put(mockServer.getPort(), mockServer.getHostName(), - "/upload/test" ); + "/upload/test"); httpClient.executeUploadFileInternal(sidecarInstance, vertxRequest, fileToUpload.toString()) .get(30, TimeUnit.SECONDS); @@ -190,7 +188,7 @@ private void runTestScenario(Path tempDirectory, assertThat(httpClient.capturedFiles).hasSize(1); assertThatThrownBy(() -> httpClient.capturedFiles.get(0).end()) .isInstanceOf(IllegalStateException.class) - .hasMessageContaining("File handle is closed" ); + .hasMessageContaining("File handle is closed"); } /** @@ -218,8 +216,8 @@ protected Future> sendFileStream(HttpRequest vertxR private Path prepareFile(Path tempDirectory) throws IOException { - Path fileToUpload = tempDirectory.resolve("nb-1-big-TOC.txt" ); - try (InputStream inputStream = getClass().getClassLoader().getResourceAsStream("sstables/nb-1-big-TOC.txt" )) + Path fileToUpload = tempDirectory.resolve("nb-1-big-TOC.txt"); + try (InputStream inputStream = getClass().getClassLoader().getResourceAsStream("sstables/nb-1-big-TOC.txt")) { assertThat(inputStream).isNotNull(); copy(inputStream, fileToUpload, StandardCopyOption.REPLACE_EXISTING); From 054fe75ad3238d718427e64b1e7721864cc7a0ad Mon Sep 17 00:00:00 2001 From: ssekaran Date: Tue, 9 Dec 2025 11:36:13 -0800 Subject: [PATCH 04/12] exception handling --- .../cassandra/sidecar/client/VertxHttpClient.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java b/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java index 939ba8172..c18ab4e12 100644 --- a/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java +++ b/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java @@ -201,9 +201,16 @@ protected Future> sendFileStream( .sendStream(pair.getValue() .setReadBufferSize(config.sendReadBufferSize())) .onComplete(ar -> { - asyncFile.close().onFailure(err -> - LOGGER.warn("Failed to close file after upload: filename='{}'", filename, err) - ); + try + { + asyncFile.close().onFailure(err -> + LOGGER.warn("Failed to close file after upload: filename='{}'", filename, err) + ); + } + catch (IllegalStateException ex) + { + LOGGER.warn("File already closed, ignoring close attempt: filename='{}'", filename, ex); + } }); } From 9772197ad34686ed5434be9ce13799bc5cdb9c2c Mon Sep 17 00:00:00 2001 From: ssekaran Date: Tue, 9 Dec 2025 17:19:43 -0800 Subject: [PATCH 05/12] logging --- analytics-sidecar-client/build.gradle | 2 +- cassandra-analytics-integration-tests/build.gradle | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/analytics-sidecar-client/build.gradle b/analytics-sidecar-client/build.gradle index 202c592d7..5b7484c68 100644 --- a/analytics-sidecar-client/build.gradle +++ b/analytics-sidecar-client/build.gradle @@ -39,7 +39,7 @@ sourceCompatibility = 1.8 test { useJUnitPlatform() testLogging { - events "passed", "skipped", "failed" + events "started", "passed", "skipped", "failed" } maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) ?: 1 reports { diff --git a/cassandra-analytics-integration-tests/build.gradle b/cassandra-analytics-integration-tests/build.gradle index d8d469f66..776d30dd2 100644 --- a/cassandra-analytics-integration-tests/build.gradle +++ b/cassandra-analytics-integration-tests/build.gradle @@ -102,7 +102,7 @@ test { } testLogging { - events "passed", "skipped", "failed" + events "started", "passed", "skipped", "failed" showExceptions true exceptionFormat "full" From 23323f5660c465155a3ac10b2ed0308877198ab4 Mon Sep 17 00:00:00 2001 From: ssekaran Date: Wed, 10 Dec 2025 11:48:02 -0800 Subject: [PATCH 06/12] Testing for confirmation --- .../sidecar/client/VertxHttpClient.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java b/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java index c18ab4e12..2c0f3be15 100644 --- a/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java +++ b/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java @@ -201,16 +201,16 @@ protected Future> sendFileStream( .sendStream(pair.getValue() .setReadBufferSize(config.sendReadBufferSize())) .onComplete(ar -> { - try - { - asyncFile.close().onFailure(err -> - LOGGER.warn("Failed to close file after upload: filename='{}'", filename, err) - ); - } - catch (IllegalStateException ex) - { - LOGGER.warn("File already closed, ignoring close attempt: filename='{}'", filename, ex); - } +// try +// { +// asyncFile.close().onFailure(err -> +// LOGGER.warn("Failed to close file after upload: filename='{}'", filename, err) +// ); +// } +// catch (IllegalStateException ex) +// { +// LOGGER.warn("File already closed, ignoring close attempt: filename='{}'", filename, ex); +// } }); } From 7f322e3ff8e76568c37c0184f36804002bcf0e16 Mon Sep 17 00:00:00 2001 From: ssekaran Date: Wed, 10 Dec 2025 14:22:22 -0800 Subject: [PATCH 07/12] Added delay --- .../sidecar/client/VertxHttpClient.java | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java b/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java index 2c0f3be15..2f8ab4374 100644 --- a/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java +++ b/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java @@ -201,16 +201,19 @@ protected Future> sendFileStream( .sendStream(pair.getValue() .setReadBufferSize(config.sendReadBufferSize())) .onComplete(ar -> { -// try -// { -// asyncFile.close().onFailure(err -> -// LOGGER.warn("Failed to close file after upload: filename='{}'", filename, err) -// ); -// } -// catch (IllegalStateException ex) -// { -// LOGGER.warn("File already closed, ignoring close attempt: filename='{}'", filename, ex); -// } + vertx.setTimer(1000, timerId -> { + // Defer file closing for 1 second + try + { + asyncFile.close().onFailure(err -> + LOGGER.warn("Failed to close file after upload: filename='{}'", filename, err) + ); + } + catch (IllegalStateException ex) + { + LOGGER.warn("File already closed, ignoring close attempt: filename='{}'", filename, ex); + } + }); }); } From 99c8d5db4c868294fa7f0075fe5e0e327622f133 Mon Sep 17 00:00:00 2001 From: ssekaran Date: Thu, 11 Dec 2025 09:37:39 -0800 Subject: [PATCH 08/12] Added vertx logging --- cassandra-analytics-integration-tests/build.gradle | 1 + 1 file changed, 1 insertion(+) diff --git a/cassandra-analytics-integration-tests/build.gradle b/cassandra-analytics-integration-tests/build.gradle index 776d30dd2..0f279bf6a 100644 --- a/cassandra-analytics-integration-tests/build.gradle +++ b/cassandra-analytics-integration-tests/build.gradle @@ -90,6 +90,7 @@ test { systemProperty "SKIP_STARTUP_VALIDATIONS", "true" systemProperty "logback.configurationFile", "src/test/resources/logback-test.xml" systemProperty "cassandra.integration.sidecar.test.enable_mtls", integrationEnableMtls + systemProperty "vertx.logger-delegate-factory-class-name", "io.vertx.core.logging.SLF4JLogDelegateFactory" minHeapSize = '1g' maxHeapSize = integrationMaxHeapSize maxParallelForks = integrationMaxParallelForks From a200b73d6d71ce495c8102cca0cf35b1e3143579 Mon Sep 17 00:00:00 2001 From: ssekaran Date: Thu, 11 Dec 2025 09:47:13 -0800 Subject: [PATCH 09/12] Removing delay --- .../org/apache/cassandra/sidecar/client/VertxHttpClient.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java b/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java index 2f8ab4374..76358437b 100644 --- a/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java +++ b/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java @@ -201,7 +201,7 @@ protected Future> sendFileStream( .sendStream(pair.getValue() .setReadBufferSize(config.sendReadBufferSize())) .onComplete(ar -> { - vertx.setTimer(1000, timerId -> { + // vertx.setTimer(1000, timerId -> { // Defer file closing for 1 second try { @@ -213,7 +213,7 @@ protected Future> sendFileStream( { LOGGER.warn("File already closed, ignoring close attempt: filename='{}'", filename, ex); } - }); + //}); }); } From d5fbe84728dca4fac6e7837e72c8dfb765d97e96 Mon Sep 17 00:00:00 2001 From: ssekaran Date: Thu, 11 Dec 2025 14:04:25 -0800 Subject: [PATCH 10/12] Replaced with eventually --- .../sidecar/client/VertxHttpClient.java | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java b/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java index 76358437b..a5c5cb32b 100644 --- a/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java +++ b/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java @@ -200,19 +200,13 @@ protected Future> sendFileStream( valueOf(pair.getKey())) .sendStream(pair.getValue() .setReadBufferSize(config.sendReadBufferSize())) - .onComplete(ar -> { + .eventually(() -> { // vertx.setTimer(1000, timerId -> { // Defer file closing for 1 second - try - { - asyncFile.close().onFailure(err -> - LOGGER.warn("Failed to close file after upload: filename='{}'", filename, err) - ); - } - catch (IllegalStateException ex) - { - LOGGER.warn("File already closed, ignoring close attempt: filename='{}'", filename, ex); - } + return asyncFile.close().onFailure(err -> + LOGGER.warn("Failed to close file after upload: filename='{}'", filename, err) + ); + //}); }); } From b4909daaa2743302430a80333ee5e692902f29c3 Mon Sep 17 00:00:00 2001 From: ssekaran Date: Thu, 11 Dec 2025 18:32:00 -0800 Subject: [PATCH 11/12] catch with eventually --- .../cassandra/sidecar/client/VertxHttpClient.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java b/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java index a5c5cb32b..0e7c28c6d 100644 --- a/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java +++ b/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java @@ -203,9 +203,13 @@ protected Future> sendFileStream( .eventually(() -> { // vertx.setTimer(1000, timerId -> { // Defer file closing for 1 second - return asyncFile.close().onFailure(err -> - LOGGER.warn("Failed to close file after upload: filename='{}'", filename, err) - ); + try{ + return asyncFile.close().onFailure(err -> + LOGGER.warn("Failed to close file after upload: filename='{}'", filename, err)); + }catch (Exception ex){ + LOGGER.warn("Failed due to exception for filename='{}'", filename, ex); + return Future.failedFuture(ex); + } //}); }); From 7e3ce2ef75b4264fc73a84777e4423e751f240bf Mon Sep 17 00:00:00 2001 From: ssekaran Date: Thu, 11 Dec 2025 18:40:55 -0800 Subject: [PATCH 12/12] checkstyle --- .../cassandra/sidecar/client/VertxHttpClient.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java b/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java index 0e7c28c6d..606e933fb 100644 --- a/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java +++ b/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java @@ -201,12 +201,15 @@ protected Future> sendFileStream( .sendStream(pair.getValue() .setReadBufferSize(config.sendReadBufferSize())) .eventually(() -> { - // vertx.setTimer(1000, timerId -> { - // Defer file closing for 1 second - try{ + // vertx.setTimer(1000, timerId -> { + // Defer file closing for 1 second + try + { return asyncFile.close().onFailure(err -> LOGGER.warn("Failed to close file after upload: filename='{}'", filename, err)); - }catch (Exception ex){ + } + catch (Exception ex) + { LOGGER.warn("Failed due to exception for filename='{}'", filename, ex); return Future.failedFuture(ex); }