diff --git a/CHANGES.txt b/CHANGES.txt index 56c088f42..94652903e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,6 +1,10 @@ -0.2.0 +0.3.0 ----- + * File descriptor leak after file streamed in Sidecar Client (CASSANALYTICS-103) * Add TimeRangeFilter to filter out SSTables outside given time window (CASSANALYTICS-102) + +0.2.0 +----- * Generated distribution artifacts fix (CASSANALYTICS-105) * Fix SSTable descriptor mismatch preventing newly produced SSTables from being uploaded (CASSANALYTICS-98) * Expose SidecarCdc builders and interfaces (CASSANALYTICS-94) diff --git a/analytics-sidecar-client/build.gradle b/analytics-sidecar-client/build.gradle index 202c592d7..5b7484c68 100644 --- a/analytics-sidecar-client/build.gradle +++ b/analytics-sidecar-client/build.gradle @@ -39,7 +39,7 @@ sourceCompatibility = 1.8 test { useJUnitPlatform() testLogging { - events "passed", "skipped", "failed" + events "started", "passed", "skipped", "failed" } maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) ?: 1 reports { diff --git a/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java b/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java index f4c3890df..606e933fb 100644 --- a/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java +++ b/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java @@ -57,6 +57,7 @@ import org.apache.cassandra.sidecar.common.request.Request; import org.apache.cassandra.sidecar.common.request.UploadableRequest; +import static java.lang.String.valueOf; import static org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.AUTH_ROLE; import static org.apache.cassandra.sidecar.common.utils.StringUtils.isNullOrEmpty; @@ -165,11 +166,7 @@ protected CompletableFuture executeUploadFileInternal(SidecarInsta Promise promise = Promise.promise(); // open the local file openFileForRead(vertx.fileSystem(), filename) - .compose(pair -> vertxRequest.ssl(config.ssl()) - .putHeader(HttpHeaderNames.CONTENT_LENGTH.toString(), - String.valueOf(pair.getKey())) - .sendStream(pair.getValue() - .setReadBufferSize(config.sendReadBufferSize()))) + .compose(pair -> sendFileStream(vertxRequest, pair, filename)) .onFailure(promise::fail) .onSuccess(response -> { byte[] raw = response.body() != null ? response.body().getBytes() : null; @@ -184,6 +181,43 @@ protected CompletableFuture executeUploadFileInternal(SidecarInsta return promise.future().toCompletionStage().toCompletableFuture(); } + /** + * Sends the file stream via HTTP request. + * + * @param vertxRequest the HTTP request to send the file stream with + * @param pair a pair containing file size and the AsyncFile handle + * @param filename the name of the file being uploaded (for logging purposes) + * @return a Future that completes when the file has been sent + */ + protected Future> sendFileStream( + HttpRequest vertxRequest, + AbstractMap.SimpleEntry pair, + String filename) + { + AsyncFile asyncFile = pair.getValue(); + return vertxRequest.ssl(config.ssl()) + .putHeader(HttpHeaderNames.CONTENT_LENGTH.toString(), + valueOf(pair.getKey())) + .sendStream(pair.getValue() + .setReadBufferSize(config.sendReadBufferSize())) + .eventually(() -> { + // vertx.setTimer(1000, timerId -> { + // Defer file closing for 1 second + try + { + return asyncFile.close().onFailure(err -> + LOGGER.warn("Failed to close file after upload: filename='{}'", filename, err)); + } + catch (Exception ex) + { + LOGGER.warn("Failed due to exception for filename='{}'", filename, ex); + return Future.failedFuture(ex); + } + + //}); + }); + } + /** * {@inheritDoc} */ diff --git a/analytics-sidecar-vertx-client/src/test/java/org/apache/cassandra/sidecar/client/VertxHttpClientTest.java b/analytics-sidecar-vertx-client/src/test/java/org/apache/cassandra/sidecar/client/VertxHttpClientTest.java index 8d8d0952d..8296952b1 100644 --- a/analytics-sidecar-vertx-client/src/test/java/org/apache/cassandra/sidecar/client/VertxHttpClientTest.java +++ b/analytics-sidecar-vertx-client/src/test/java/org/apache/cassandra/sidecar/client/VertxHttpClientTest.java @@ -18,16 +18,36 @@ package org.apache.cassandra.sidecar.client; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.AbstractMap.SimpleEntry; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; +import io.vertx.core.file.AsyncFile; import io.vertx.ext.web.client.HttpRequest; +import io.vertx.ext.web.client.HttpResponse; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.apache.cassandra.sidecar.client.request.RequestExecutorTest; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static java.nio.file.Files.copy; import static org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.AUTH_ROLE; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -37,17 +57,35 @@ public class VertxHttpClientTest { private static Vertx vertx; + private MockWebServer mockServer; + private HttpClientConfig config; + private SidecarInstanceImpl sidecarInstance; - @BeforeAll - public static void setUp() + @BeforeEach + public void setUp() throws IOException { vertx = Vertx.vertx(); + mockServer = new MockWebServer(); + mockServer.start(); + + config = new HttpClientConfig.Builder<>() + .ssl(false) + .timeoutMillis(30000) + .build(); + sidecarInstance = RequestExecutorTest.newSidecarInstance(mockServer); } - @AfterAll - public static void tearDown() + @AfterEach + public void tearDown() throws IOException { - vertx.close(); + if (mockServer != null) + { + mockServer.shutdown(); + } + if (vertx != null) + { + vertx.close(); + } } @Test @@ -74,4 +112,116 @@ private HttpClientConfig.Builder httpClientConfigBuilder() .timeoutMillis(100) .idleTimeoutMillis(100); } + + @Test + void testUploadSSTableClosesFile(@TempDir Path tempDirectory) throws Exception + { + runTestScenario(tempDirectory, + new MockResponse().setResponseCode(OK.code()), + new ExposeAsyncFileVertxHttpClient(vertx, config)); + } + + @Test + void testUploadClosesFileOnUploadFailure(@TempDir Path tempDirectory) throws Exception + { + runTestScenario(tempDirectory, + new MockResponse().setResponseCode(INTERNAL_SERVER_ERROR.code()), + new ExposeAsyncFileVertxHttpClient(vertx, config)); + } + + @Test + void testMultipleUploadClosesAllFiles(@TempDir Path tempDirectory) throws Exception + { + mockServer.enqueue(new MockResponse().setResponseCode(OK.code())); + mockServer.enqueue(new MockResponse().setResponseCode(OK.code())); + mockServer.enqueue(new MockResponse().setResponseCode(OK.code())); + + Path fileToUpload = prepareFile(tempDirectory); + + ExposeAsyncFileVertxHttpClient httpClient = new ExposeAsyncFileVertxHttpClient(vertx, config); + + // Upload the same file 3 times (simulating multiple file uploads) + for (int i = 0; i < 3; i++) + { + HttpRequest vertxRequest = httpClient.webClient.put(mockServer.getPort(), + mockServer.getHostName(), + "/upload/test" + i); + httpClient.executeUploadFileInternal(sidecarInstance, vertxRequest, fileToUpload.toString()) + .get(30, TimeUnit.SECONDS); + } + + assertThat(mockServer.getRequestCount()).isEqualTo(3); + assertThat(httpClient.capturedFiles).hasSize(3); + + // Give async file close operations time to complete + Thread.sleep(100); + + // Verify all the files are closed by attempting to call .end() which should throw IllegalStateException + for (AsyncFile file : httpClient.capturedFiles) + { + assertThatThrownBy(file::end) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("File handle is closed"); + } + } + + private void runTestScenario(Path tempDirectory, + MockResponse mockResponse, + ExposeAsyncFileVertxHttpClient httpClient) throws Exception + { + mockServer.enqueue(mockResponse); + + Path fileToUpload = prepareFile(tempDirectory); + HttpRequest vertxRequest = httpClient.webClient.put(mockServer.getPort(), + mockServer.getHostName(), + "/upload/test"); + + httpClient.executeUploadFileInternal(sidecarInstance, vertxRequest, fileToUpload.toString()) + .get(30, TimeUnit.SECONDS); + + assertThat(mockServer.getRequestCount()).isEqualTo(1); + + // Give async file close operation time to complete + Thread.sleep(100); + + // Verify file is closed by attempting to call .end() which should throw IllegalStateException + assertThat(httpClient.capturedFiles).hasSize(1); + assertThatThrownBy(() -> httpClient.capturedFiles.get(0).end()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("File handle is closed"); + } + + /** + * Class that extends from {@link VertxHttpClient} for testing purposes and holds a reference to the + * {@link AsyncFile} to ensure that the file has been closed. + */ + static class ExposeAsyncFileVertxHttpClient extends VertxHttpClient + { + List capturedFiles = new ArrayList<>(); + + ExposeAsyncFileVertxHttpClient(Vertx vertx, HttpClientConfig config) + { + super(vertx, config); + } + + @Override + protected Future> sendFileStream(HttpRequest vertxRequest, + SimpleEntry pair, + String filename) + { + capturedFiles.add(pair.getValue()); + return super.sendFileStream(vertxRequest, pair, filename); + } + } + + private Path prepareFile(Path tempDirectory) throws IOException + { + Path fileToUpload = tempDirectory.resolve("nb-1-big-TOC.txt"); + try (InputStream inputStream = getClass().getClassLoader().getResourceAsStream("sstables/nb-1-big-TOC.txt")) + { + assertThat(inputStream).isNotNull(); + copy(inputStream, fileToUpload, StandardCopyOption.REPLACE_EXISTING); + } + return fileToUpload; + } } diff --git a/cassandra-analytics-integration-tests/build.gradle b/cassandra-analytics-integration-tests/build.gradle index d8d469f66..0f279bf6a 100644 --- a/cassandra-analytics-integration-tests/build.gradle +++ b/cassandra-analytics-integration-tests/build.gradle @@ -90,6 +90,7 @@ test { systemProperty "SKIP_STARTUP_VALIDATIONS", "true" systemProperty "logback.configurationFile", "src/test/resources/logback-test.xml" systemProperty "cassandra.integration.sidecar.test.enable_mtls", integrationEnableMtls + systemProperty "vertx.logger-delegate-factory-class-name", "io.vertx.core.logging.SLF4JLogDelegateFactory" minHeapSize = '1g' maxHeapSize = integrationMaxHeapSize maxParallelForks = integrationMaxParallelForks @@ -102,7 +103,7 @@ test { } testLogging { - events "passed", "skipped", "failed" + events "started", "passed", "skipped", "failed" showExceptions true exceptionFormat "full"