diff --git a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java index 3ce3bf2e2..cec01fc0f 100644 --- a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java +++ b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java @@ -257,6 +257,25 @@ public void drain() throws IOException, InterruptedException, ExecutionException .drain(); } + /** + * {@inheritDoc} + */ + @Override + public List getParentRepairStatus(int cmd) + { + return jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME) + .getParentRepairStatus(cmd); + } + + /** + * {@inheritDoc} + */ + public int repair(String keyspace, Map repairOptions) + { + StorageJmxOperations ssProxy = jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME); + return ssProxy.repairAsync(keyspace, repairOptions); + } + /** * {@inheritDoc} */ diff --git a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/RepairOptions.java b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/RepairOptions.java new file mode 100644 index 000000000..89604c32f --- /dev/null +++ b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/RepairOptions.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base; + +/** + * Enum representing the repair options supported. + * This class mimics the options supported by the repair operation in Apache Cassandra + * as defined in org.apache.cassandra.repair.messages.RepairOption + * (...) + */ +public enum RepairOptions +{ + /** + * Whether to repair only the primary range of the node (true/false) + */ + PRIMARY_RANGE("primaryRange"), + /** + * Whether to perform an incremental repair (true/false) + * If false, a full repair is performed + */ + INCREMENTAL("incremental"), + /** + * Specific token ranges to repair + */ + RANGES("ranges"), + /** + * List of column families (tables) to repair (comma-separated) + */ + COLUMNFAMILIES("columnFamilies"), + /** + * Restrict repair to specific data centers (comma-separated) + */ + DATACENTERS("dataCenters"), + /** + * Restrict repair to specific hosts (comma-separated IPs or hostnames) + */ + HOSTS("hosts"), + /** + * Force the repair operation + */ + FORCE_REPAIR("forceRepair"), + /** + * Type of preview repair to run before actual repair + * Options: "none", "auto", "running", "full" + * Mainly used to assess data consistency without actually repairing + */ + PREVIEW("previewKind"); + + private final String value; + + RepairOptions(String value) + { + this.value = value; + } + + /** + * @return Name corresponding to the repair option + */ + public String optionName() + { + return value; + } +} diff --git a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java index fb7532373..fd3202e84 100644 --- a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java +++ b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java @@ -225,4 +225,16 @@ public int getCompactionThroughputMbPerSec() { return delegate.getCompactionThroughputMbPerSec(); } + + @Override + public int repairAsync(String keyspace, Map options) + { + return delegate.repairAsync(keyspace, options); + } + + @Override + public List getParentRepairStatus(int cmd) + { + return delegate.getParentRepairStatus(cmd); + } } diff --git a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java index 11e838c4d..ff526e36e 100644 --- a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java +++ b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java @@ -233,4 +233,20 @@ public interface StorageJmxOperations * @return the current compaction throughput in megabytes per second, or 0 if throughput cannot be determined */ int getCompactionThroughputMbPerSec(); + + /** + * Trigger an async repair operation for the given keyspace and options + * @param keyspace keyspace to be repaired + * @param options repair options + * @return repair command number, or 0 if nothing to repair + */ + int repairAsync(String keyspace, Map options); + + /** + * Get the status of a given parent repair session. + * @param cmd the int reference returned when issuing the repair + * @return status of parent repair from ParentRepairStatus + * followed by final message or messages of the session + */ + List getParentRepairStatus(int cmd); } diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java index f9911b639..aee6ce466 100644 --- a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java @@ -147,6 +147,7 @@ public final class ApiEndpointsV1 public static final String NODE_DRAIN_ROUTE = API_V1 + CASSANDRA + "/operations/drain"; public static final String STREAM_STATS_ROUTE = API_V1 + CASSANDRA + "/stats/streams"; public static final String TABLE_STATS_ROUTE = API_V1 + CASSANDRA + PER_KEYSPACE + PER_TABLE + "/stats"; + public static final String REPAIR_ROUTE = API_V1 + CASSANDRA + PER_KEYSPACE + "/repair"; // Live Migration APIs diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/RepairRequest.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/RepairRequest.java new file mode 100644 index 000000000..f0ee9db65 --- /dev/null +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/RepairRequest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.request; + +import io.netty.handler.codec.http.HttpMethod; +import org.apache.cassandra.sidecar.common.ApiEndpointsV1; +import org.apache.cassandra.sidecar.common.request.data.RepairPayload; +import org.apache.cassandra.sidecar.common.response.OperationalJobResponse; + +/** + * Class representing the repair API request + */ +public class RepairRequest extends JsonRequest +{ + private final RepairPayload repairRequestPayload; + + + /** + * Constructs a Sidecar repair request with the given parameters. + * + * @param keyspace name of the keyspace in the cluster + * @param repairRequestPayload request payload + */ + public RepairRequest(String keyspace, RepairPayload repairRequestPayload) + { + super(requestURI(keyspace)); + this.repairRequestPayload = repairRequestPayload; + } + + @Override + public HttpMethod method() + { + return HttpMethod.PUT; + } + + @Override + public Object requestBody() + { + return repairRequestPayload; + } + + static String requestURI(String keyspace) + { + return ApiEndpointsV1.REPAIR_ROUTE + .replaceAll(ApiEndpointsV1.KEYSPACE_PATH_PARAM, keyspace); + } +} diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/RepairPayload.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/RepairPayload.java new file mode 100644 index 000000000..be92504f0 --- /dev/null +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/RepairPayload.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.request.data; + +import java.util.List; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonValue; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.cassandra.sidecar.common.DataObjectBuilder; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Request payload for a repair job + */ +@JsonDeserialize(builder = RepairPayload.Builder.class) +@JsonInclude(JsonInclude.Include.NON_NULL) +public class RepairPayload +{ + private static final String TABLES = "tables"; + private static final String IS_PRIMARY_RANGE = "primaryRange"; + private static final String DATACENTER = "datacenter"; + private static final String HOSTS = "hosts"; + private static final String START_TOKEN = "startToken"; + private static final String END_TOKEN = "endToken"; + private static final String REPAIR_TYPE = "repairType"; + private static final String FORCE = "force"; + private static final String VALIDATE = "validate"; + + private final List tables; + private final Boolean isPrimaryRange; + private final String datacenter; + private final List hosts; + private final String startToken; + private final String endToken; + private final RepairType repairType; + private final Boolean force; + private final Boolean validate; + + public static RepairPayload.Builder builder() + { + return new RepairPayload.Builder(); + } + + /** + * Constructs a new {@link RepairPayload} from the configured {@link RepairPayload.Builder}. + * + * @param builder the builder used to create this object + */ + protected RepairPayload(RepairPayload.Builder builder) + { + tables = builder.tables; + repairType = builder.repairType; + isPrimaryRange = builder.isPrimaryRange; + datacenter = builder.datacenter; + hosts = builder.hosts; + startToken = builder.startToken; + endToken = builder.endToken; + force = builder.force; + validate = builder.validate; + } + + @Nullable @JsonProperty(TABLES) + public List tables() + { + return tables; + } + + @JsonProperty(IS_PRIMARY_RANGE) + public Boolean isPrimaryRange() + { + return isPrimaryRange; + } + + @JsonProperty(DATACENTER) + public String datacenter() + { + return datacenter; + } + + @JsonProperty(HOSTS) + public List hosts() + { + return hosts; + } + + @JsonProperty(START_TOKEN) + public String startToken() + { + return startToken; + } + + @JsonProperty(END_TOKEN) + public String endToken() + { + return endToken; + } + + @JsonProperty(REPAIR_TYPE) + public RepairType repairType() + { + return repairType; + } + + @Nullable @JsonProperty(FORCE) + public Boolean force() + { + return force; + } + + @JsonProperty(VALIDATE) + public Boolean shouldValidate() + { + return validate; + } + + /** + * Enum representing types of repair supported + */ + public enum RepairType + { + FULL, + INCREMENTAL; + + @JsonValue + public String getValue() + { + return name().toLowerCase(); + } + + @JsonCreator + public static RepairType fromValue(@NotNull String text) + { + String trimmed = text.trim(); + try + { + if (trimmed.isEmpty()) + { + throw new IllegalArgumentException("Unexpected value: " + text); + } + + return valueOf(trimmed.toUpperCase()); + } + catch (IllegalArgumentException e) + { + throw new IllegalArgumentException("Unexpected value: " + text); + } + } + } + + /** + * Builder implementation to construct a {@code RepairPayload} + */ + @JsonIgnoreProperties(ignoreUnknown = true) + public static final class Builder implements DataObjectBuilder + { + private List tables; + private Boolean isPrimaryRange; + private String datacenter; + private List hosts; + private String startToken; + private String endToken; + private RepairType repairType; + private Boolean force; + private Boolean validate; + + private Builder() + { + } + + @Override + public RepairPayload.Builder self() + { + return this; + } + + @JsonProperty(TABLES) + public RepairPayload.Builder tables(List tables) + { + return update(b -> b.tables = tables); + } + + @JsonProperty(IS_PRIMARY_RANGE) + public RepairPayload.Builder isPrimaryRange(boolean isPrimaryRange) + { + return update(b -> b.isPrimaryRange = isPrimaryRange); + } + + @JsonProperty(DATACENTER) + public RepairPayload.Builder datacenter(String datacenter) + { + return update(b -> b.datacenter = datacenter); + } + + @JsonProperty(REPAIR_TYPE) + public RepairPayload.Builder repairType(RepairType type) + { + return update(b -> b.repairType = type); + } + + @JsonProperty(HOSTS) + public RepairPayload.Builder hosts(List hosts) + { + return update(b -> b.hosts = hosts); + } + + @JsonProperty(START_TOKEN) + public RepairPayload.Builder startToken(String startToken) + { + return update(b -> b.startToken = startToken); + } + + @JsonProperty(END_TOKEN) + public RepairPayload.Builder endToken(String endToken) + { + return update(b -> b.endToken = endToken); + } + + @JsonProperty(FORCE) + public RepairPayload.Builder force(boolean force) + { + return update(b -> b.force = force); + } + + @JsonProperty(VALIDATE) + public RepairPayload.Builder shouldValidate(boolean validate) + { + return update(b -> b.validate = validate); + } + + @Override + public RepairPayload build() + { + return new RepairPayload(this); + } + } +} diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java index 04d0298c1..f6d0c0fd9 100644 --- a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java +++ b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java @@ -46,6 +46,7 @@ import org.apache.cassandra.sidecar.common.request.ListCdcSegmentsRequest; import org.apache.cassandra.sidecar.common.request.LiveMigrationListInstanceFilesRequest; import org.apache.cassandra.sidecar.common.request.LiveMigrationStatusRequest; +import org.apache.cassandra.sidecar.common.request.RepairRequest; import org.apache.cassandra.sidecar.common.request.RestoreJobProgressRequest; import org.apache.cassandra.sidecar.common.request.RestoreJobSummaryRequest; import org.apache.cassandra.sidecar.common.request.Service; @@ -59,6 +60,7 @@ import org.apache.cassandra.sidecar.common.request.data.CreateSliceRequestPayload; import org.apache.cassandra.sidecar.common.request.data.Digest; import org.apache.cassandra.sidecar.common.request.data.NodeCommandRequestPayload; +import org.apache.cassandra.sidecar.common.request.data.RepairPayload; import org.apache.cassandra.sidecar.common.request.data.RestoreJobProgressRequestParams; import org.apache.cassandra.sidecar.common.request.data.UpdateCdcServiceConfigPayload; import org.apache.cassandra.sidecar.common.request.data.UpdateRestoreJobRequestPayload; @@ -841,6 +843,23 @@ public CompletableFuture nodeDecommission(SidecarInstanc .build()); } + /** + * Executes a repair operation on the provided instance, keyspace and options + * @param instance the instance where the request will be executed + * @param keyspace keyspace for which the repair is being performed + * @param payload the repair request options as payload + * @return a completable future of the repair operation job response + */ + public CompletableFuture repair(SidecarInstance instance, + String keyspace, + RepairPayload payload) + { + return executor.executeRequestAsync(requestBuilder() + .singleInstanceSelectionPolicy(instance) + .request(new RepairRequest(keyspace, payload)) + .build()); + } + /** * Executes the node drain request using the default retry policy and configured selection policy * diff --git a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java index 7cc117c34..049daa350 100644 --- a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java +++ b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java @@ -75,6 +75,7 @@ import org.apache.cassandra.sidecar.common.request.data.CreateRestoreJobRequestPayload; import org.apache.cassandra.sidecar.common.request.data.MD5Digest; import org.apache.cassandra.sidecar.common.request.data.NodeCommandRequestPayload; +import org.apache.cassandra.sidecar.common.request.data.RepairPayload; import org.apache.cassandra.sidecar.common.request.data.UpdateCdcServiceConfigPayload; import org.apache.cassandra.sidecar.common.request.data.XXHash32Digest; import org.apache.cassandra.sidecar.common.response.CompactionStatsResponse; @@ -1468,6 +1469,29 @@ public void testNodeDrain() throws Exception validateResponseServed(ApiEndpointsV1.NODE_DRAIN_ROUTE); } + @Test + public void testRepair() throws Exception + { + UUID jobId = UUID.randomUUID(); + String repairResponseString = "{\"jobId\":\"" + jobId + "\",\"jobStatus\":\"SUCCEEDED\",\"instance\":\"127.0.0.1\"}"; + + MockResponse response = new MockResponse() + .setResponseCode(OK.code()) + .setHeader("content-type", "application/json") + .setBody(repairResponseString); + enqueue(response); + RepairPayload payload = RepairPayload.builder() + .tables(Collections.singletonList("test_table")) + .isPrimaryRange(true) + .build(); + + SidecarInstanceImpl sidecarInstance = RequestExecutorTest.newSidecarInstance(servers.get(0)); + OperationalJobResponse result = client.repair(sidecarInstance, "testkeyspace", payload).get(30, TimeUnit.SECONDS); + assertThat(result).isNotNull(); + assertThat(result.status()).isEqualTo(OperationalJobStatus.SUCCEEDED); + validateResponseServed(ApiEndpointsV1.REPAIR_ROUTE.replaceAll(KEYSPACE_PATH_PARAM, "testkeyspace")); + } + @Test void testFailsWithOneAttemptPerServer() { diff --git a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java index 101a40ae4..5cdaf57a2 100644 --- a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java +++ b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java @@ -36,6 +36,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -118,6 +119,7 @@ import static org.apache.cassandra.sidecar.config.yaml.S3ClientConfigurationImpl.DEFAULT_API_CALL_TIMEOUT; import static org.apache.cassandra.sidecar.testing.MtlsTestHelper.CASSANDRA_INTEGRATION_TEST_ENABLE_MTLS; import static org.apache.cassandra.testing.DriverTestUtils.buildContactPoints; +import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE; import static org.apache.cassandra.testing.utils.IInstanceUtils.tryGetIntConfig; import static org.assertj.core.api.Assertions.assertThat; @@ -162,7 +164,7 @@ public abstract class SharedClusterIntegrationTestBase { protected final Logger logger = LoggerFactory.getLogger(SharedClusterIntegrationTestBase.class); private static final int MAX_CLUSTER_PROVISION_RETRIES = 5; - + private static final AtomicInteger TEST_TABLE_ID = new AtomicInteger(0); @TempDir static Path secretsPath; @@ -364,6 +366,14 @@ protected void createTestTable(Consumer queryExecution, QualifiedName na queryExecution.accept(String.format(createTableStatement, name)); } + protected QualifiedName createUniqueTestTable(String tablePrefix, String createTableStatement) + { + String uniqueTableName = tablePrefix + TEST_TABLE_ID.getAndIncrement(); + QualifiedName table = new QualifiedName(TEST_KEYSPACE, uniqueTableName); + cluster.schemaChangeIgnoringStoppedInstances(String.format(createTableStatement, table)); + return table; + } + /** * Override to provide additional options to configure sidecar * diff --git a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/ClearSnapshotIntegrationTest.java b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/ClearSnapshotIntegrationTest.java index a8d198f8e..d9e4e4e7b 100644 --- a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/ClearSnapshotIntegrationTest.java +++ b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/ClearSnapshotIntegrationTest.java @@ -19,6 +19,8 @@ package org.apache.cassandra.sidecar.routes; +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; import java.nio.file.Path; import java.util.List; import java.util.UUID; @@ -86,8 +88,8 @@ private void testSnapshotCreateAndDelete(QualifiedName tableName) { String snapshotName = "my-snapshot-" + UUID.randomUUID(); String testRoute = String.format(SNAPSHOT_ROUTE_TEMPLATE, - tableName.maybeQuotedKeyspace(), - tableName.maybeQuotedTable(), + urlEncode(tableName.maybeQuotedKeyspace()), + urlEncode(tableName.maybeQuotedTable()), snapshotName); // Create the snapshot @@ -119,6 +121,18 @@ private void assertNotFoundOnDeleteSnapshot(String testRoute) assertThat(response.statusCode()).isEqualTo(NOT_FOUND.code()); } + private String urlEncode(String value) + { + try + { + return URLEncoder.encode(value, "UTF-8"); + } + catch (UnsupportedEncodingException e) + { + throw new RuntimeException("UTF-8 encoding not supported", e); + } + } + @Override protected void initializeSchemaForTest() { diff --git a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/RepairIntegrationTest.java b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/RepairIntegrationTest.java new file mode 100644 index 000000000..1d6669349 --- /dev/null +++ b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/RepairIntegrationTest.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; +import java.util.stream.StreamSupport; + +import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpResponseExpectation; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.client.HttpResponse; +import io.vertx.ext.web.codec.BodyCodec; +import io.vertx.junit5.VertxTestContext; +import org.apache.cassandra.distributed.api.IInstance; +import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; +import org.apache.cassandra.sidecar.common.request.data.RepairPayload; +import org.apache.cassandra.sidecar.common.response.OperationalJobResponse; +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.cassandra.sidecar.testing.SharedClusterSidecarIntegrationTestBase; +import org.apache.cassandra.testing.ClusterBuilderConfiguration; + +import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING; +import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED; +import static org.apache.cassandra.testing.TestUtils.DC1_RF3; +import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE; +import static org.apache.cassandra.testing.TestUtils.TEST_TABLE_PREFIX; +import static org.apache.cassandra.testing.utils.AssertionUtils.getBlocking; +import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + + +/** + * Integration tests for repair operations + */ +@Tag("heavy") +public class RepairIntegrationTest extends SharedClusterSidecarIntegrationTestBase +{ + + private static final String INSERT_STMT = "INSERT INTO %s (race_year, race_name, rank, cyclist_name) " + + "VALUES (2015, 'Tour of Japan - Stage 4 - Minami > Shinshu', %d, 'Benjamin PRADES');"; + + private static final String SELECT_STMT = "SELECT * FROM %s;"; + + private static final String CREATE_STMT = "CREATE TABLE %s ( \n" + + " race_year int, \n" + + " race_name text, \n" + + " cyclist_name text, \n" + + " rank int, \n" + + " PRIMARY KEY ((race_year, race_name), rank) \n" + + ") WITH read_repair='NONE';"; + + private static final int NUM_ROWS = 300; + + @Override + protected ClusterBuilderConfiguration testClusterConfiguration() + { + return super.testClusterConfiguration().nodesPerDc(3); + } + + @Override + protected void initializeSchemaForTest() + { + createTestKeyspace(TEST_KEYSPACE, DC1_RF3); + } + + @Test + void repairTest(VertxTestContext context) + { + QualifiedName table = createUniqueTestTable(TEST_TABLE_PREFIX, CREATE_STMT); + populateData(table, 10); + RepairPayload payload = RepairPayload.builder() + .isPrimaryRange(true) + .tables(List.of(table.table())) + .build(); + String testRoute = "/api/v1/cassandra/keyspaces/" + TEST_KEYSPACE + "/repair"; + OperationalJobResponse response = getBlocking(trustedClient().put(serverWrapper.serverPort, "localhost", testRoute) + .as(BodyCodec.json(OperationalJobResponse.class)) + .sendJson(JsonObject.mapFrom(payload)) + .expecting(HttpResponseExpectation.SC_ACCEPTED)) + .body(); + + Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS); + assertThat(response).isNotNull(); + assertThat(response.jobId()).isNotNull(); + pollStatusForState(response.jobId().toString(), SUCCEEDED, null); + context.completeNow(); + } + + @Test + void repairReplicasWithData(VertxTestContext context) + { + QualifiedName table = createUniqueTestTable(TEST_TABLE_PREFIX, CREATE_STMT); + cluster.stream().forEach(node -> node.nodetoolResult("disableautocompaction", table.keyspace(), table.table()) + .asserts().success()); + populateDataSingleNode(table, 2, NUM_ROWS); + validateDataConsistency(table, List.of(2), NUM_ROWS); + validateDataConsistency(table, List.of(3, 1), 0); + + RepairPayload payload = RepairPayload.builder() + .isPrimaryRange(true) + .tables(List.of(table.table())) + .build(); + String testRoute = "/api/v1/cassandra/keyspaces/" + TEST_KEYSPACE + "/repair"; + OperationalJobResponse response = getBlocking(trustedClient().put(serverWrapper.serverPort, "localhost", testRoute) + .as(BodyCodec.json(OperationalJobResponse.class)) + .sendJson(JsonObject.mapFrom(payload)) + .expecting(HttpResponseExpectation.SC_ACCEPTED)) + .body(); + + assertThat(response).isNotNull(); + assertThat(response.jobId()).isNotNull(); + assertThat(response.status()).isEqualTo(RUNNING); + + Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS); + pollStatusForState(response.jobId().toString(), SUCCEEDED, null); + validateDataConsistency(table, List.of(2, 3, 1), NUM_ROWS); + context.completeNow(); + } + + @Test + void repairReplicasIRWithData(VertxTestContext context) + { + QualifiedName table = createUniqueTestTable(TEST_TABLE_PREFIX, CREATE_STMT); + cluster.stream().forEach(node -> node.nodetoolResult("disableautocompaction", table.keyspace(), table.table()) + .asserts().success()); + populateDataSingleNode(table, 2, NUM_ROWS); + validateDataConsistency(table, List.of(2), NUM_ROWS); + validateDataConsistency(table, List.of(3, 1), 0); + + RepairPayload payload = RepairPayload.builder() + .tables(List.of(table.table())) + .repairType(RepairPayload.RepairType.INCREMENTAL) + .build(); + String testRoute = "/api/v1/cassandra/keyspaces/" + TEST_KEYSPACE + "/repair"; + OperationalJobResponse response = getBlocking(trustedClient().put(serverWrapper.serverPort, "localhost", testRoute) + .as(BodyCodec.json(OperationalJobResponse.class)) + .sendJson(JsonObject.mapFrom(payload)) + .expecting(HttpResponseExpectation.SC_ACCEPTED)) + .body(); + + assertThat(response).isNotNull(); + assertThat(response.jobId()).isNotNull(); + assertThat(response.status()).isEqualTo(RUNNING); + + Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS); + pollStatusForState(response.jobId().toString(), SUCCEEDED, null); + validateDataConsistency(table, List.of(2, 3, 1), NUM_ROWS); + context.completeNow(); + } + + @Test + void repairReplicasTimeout(VertxTestContext context) + { + QualifiedName table = createUniqueTestTable(TEST_TABLE_PREFIX, CREATE_STMT); + cluster.stream().forEach(node -> node.nodetoolResult("disableautocompaction", table.keyspace(), table.table()) + .asserts().success()); + populateDataSingleNode(table, 2, NUM_ROWS); + validateDataConsistency(table, List.of(2), NUM_ROWS); + validateDataConsistency(table, List.of(3, 1), 0); + + RepairPayload payload = RepairPayload.builder() + .isPrimaryRange(true) + .tables(List.of(table.table())) + .build(); + String testRoute = "/api/v1/cassandra/keyspaces/" + TEST_KEYSPACE + "/repair"; + OperationalJobResponse response = getBlocking(trustedClient().put(serverWrapper.serverPort, "localhost", testRoute) + .as(BodyCodec.json(OperationalJobResponse.class)) + .sendJson(JsonObject.mapFrom(payload)) + .expecting(HttpResponseExpectation.SC_ACCEPTED)) + .body(); + + assertThat(response).isNotNull(); + assertThat(response.jobId()).isNotNull(); + assertThat(response.status()).isEqualTo(RUNNING); + + Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS); + pollStatusForState(response.jobId().toString(), SUCCEEDED, null); + validateDataConsistency(table, List.of(2, 3, 1), NUM_ROWS); + context.completeNow(); + } + + private void pollStatusForState(String uuid, + OperationalJobStatus expectedStatus, + String expectedReason) + { + String status = "/api/v1/cassandra/operational-jobs/" + uuid; + AtomicBoolean stateReached = new AtomicBoolean(false); + AtomicInteger counter = new AtomicInteger(0); + loopAssert(30, () -> { + counter.incrementAndGet(); + HttpResponse resp; + resp = getBlocking(trustedClient().get(serverWrapper.serverPort, "localhost", status) + .send()); + logger.info("Success Status Response code: {}", resp.statusCode()); + logger.info("Status Response: {}", resp.bodyAsString()); + if (resp.statusCode() == HttpResponseStatus.OK.code()) + { + stateReached.set(true); + OperationalJobResponse jobStatusResp = resp.bodyAsJson(OperationalJobResponse.class); + assertThat(jobStatusResp.jobId()).isEqualTo(UUID.fromString(uuid)); + assertThat(jobStatusResp.status()).isEqualTo(expectedStatus); + assertThat(jobStatusResp.reason()).isEqualTo(expectedReason); + assertThat(jobStatusResp.operation()).isEqualTo("repair"); + } + else + { + assertThat(resp.statusCode()).isEqualTo(HttpResponseStatus.ACCEPTED.code()); + OperationalJobResponse jobStatusResp = resp.bodyAsJson(OperationalJobResponse.class); + assertThat(jobStatusResp.jobId()).isEqualTo(UUID.fromString(uuid)); + } + logger.info("Request completed"); + assertThat(stateReached.get()).isTrue(); + }); + } + + private void validateDataConsistency(QualifiedName tableName, List nodes, long expectedNumRows) + { + + nodes.forEach(nodeNumber -> { + IInstance node = cluster.get(nodeNumber); + SimpleQueryResult rows = node.executeInternalWithResult(String.format(SELECT_STMT, tableName)); + long rowCount = StreamSupport.stream(rows.spliterator(), false).count(); + assertThat(expectedNumRows).isEqualTo(rowCount); + }); + } + + private void populateDataSingleNode(QualifiedName tableName, int nodeNumber, int numRows) + { + IInstance node = cluster.get(nodeNumber); + IntStream.rangeClosed(1, numRows) + .forEach(i -> { + node.executeInternal(String.format(INSERT_STMT, tableName, i)); + node.flush(TEST_KEYSPACE); + }); + } + + private void populateData(QualifiedName tableName, int numRows) + { + IntStream.rangeClosed(1, numRows) + .mapToObj(i -> String.format(INSERT_STMT, tableName, i)) + .forEach(cluster::schemaChangeIgnoringStoppedInstances); + } +} diff --git a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java index 021afa521..3a4c905a5 100644 --- a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java +++ b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java @@ -135,6 +135,24 @@ default void outOfRangeDataCleanup(@NotNull String keyspace, @NotNull String tab */ String clusterName(); + /** + * Triggers a repair operation for the given keyspace and options + * + * @param keyspace keyspace for the repair operation + * @param options repair options + * @return an integer value representing the status of the repair operation + * which can be used as a reference to check for the status of the repair session via {@link #getParentRepairStatus(int)}. + */ + int repair(String keyspace, Map options); + + /** + * Get the status of a given parent repair session. + * + * @param cmd the integer value representing a reference to a repair session` + * @return status of parent repair + */ + List getParentRepairStatus(int cmd); + /** * Triggers stop native transport of the Cassandra node */ diff --git a/server-common/src/main/java/org/apache/cassandra/sidecar/exceptions/KeyspaceNotFoundException.java b/server-common/src/main/java/org/apache/cassandra/sidecar/exceptions/KeyspaceNotFoundException.java new file mode 100644 index 000000000..36fee43a2 --- /dev/null +++ b/server-common/src/main/java/org/apache/cassandra/sidecar/exceptions/KeyspaceNotFoundException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.exceptions; + +/** + * Exception thrown when a keyspace does not exist in Cassandra. + * For instance, when attempting to access a keyspace that has been dropped + * or never existed in the connected Cassandra cluster. + */ +public class KeyspaceNotFoundException extends RuntimeException +{ + public KeyspaceNotFoundException(String keyspace) + { + super(makeErrorMessage(keyspace)); + } + + public KeyspaceNotFoundException(String keyspace, Throwable cause) + { + super(makeErrorMessage(keyspace), cause); + } + + private static String makeErrorMessage(String keyspace) + { + return "Keyspace " + keyspace + " was not found"; + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java index 2195122ef..86e39de81 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java @@ -65,6 +65,7 @@ public class BasicPermissions // sidecar operation related permissions public static final Permission READ_OPERATIONAL_JOB = new DomainAwarePermission("OPERATIONAL_JOB:READ", OPERATION_SCOPE); public static final Permission DECOMMISSION_NODE = new DomainAwarePermission("NODE:DECOMMISSION", OPERATION_SCOPE); + public static final Permission REPAIR = new StandardPermission("REPAIR", OPERATION_SCOPE); public static final Permission DRAIN_NODE = new DomainAwarePermission("NODE:DRAIN", OPERATION_SCOPE); // Permissions related to Schema Reporting diff --git a/server/src/main/java/org/apache/cassandra/sidecar/config/RepairJobsConfiguration.java b/server/src/main/java/org/apache/cassandra/sidecar/config/RepairJobsConfiguration.java new file mode 100644 index 000000000..b9829365c --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/config/RepairJobsConfiguration.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.config; + +import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration; + +/** + * Configuration for Repair jobs + */ +public interface RepairJobsConfiguration +{ + /** + * @return the max retry attempts for the repair job status to be valid + */ + int repairStatusMaxAttempts(); + + /** + * @return the polling interval (in milliseconds) that checks for the status of the repair job + */ + MillisecondBoundConfiguration repairPollInterval(); +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/config/ServiceConfiguration.java b/server/src/main/java/org/apache/cassandra/sidecar/config/ServiceConfiguration.java index 99e306e8e..9f1cf6f58 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/config/ServiceConfiguration.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/config/ServiceConfiguration.java @@ -120,6 +120,11 @@ default List listenSocketAddresses() */ ThrottleConfiguration throttleConfiguration(); + /** + * @return the configuration for repair + */ + RepairJobsConfiguration repairConfiguration(); + /** * @return the configuration for SSTable component uploads on this service */ diff --git a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/RepairJobsConfigurationImpl.java b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/RepairJobsConfigurationImpl.java new file mode 100644 index 000000000..74d826f51 --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/RepairJobsConfigurationImpl.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.config.yaml; + +import java.util.concurrent.TimeUnit; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration; +import org.apache.cassandra.sidecar.config.RepairJobsConfiguration; + +/** + * Configuration for Repair jobs + */ +public class RepairJobsConfigurationImpl implements RepairJobsConfiguration +{ + public static final int DEFAULT_VALID_REPAIR_STATUS_ATTEMPTS = 5; + public static final long DEFAULT_REPAIR_STATUS_POLLING_INTERVAL_MILLIS = 2_000L; + + @JsonProperty(value = "repair_status_attempts", defaultValue = DEFAULT_VALID_REPAIR_STATUS_ATTEMPTS + "") + protected final int validRepairStatusAttempts; + + @JsonProperty(value = "repair_status_polling_interval", defaultValue = DEFAULT_REPAIR_STATUS_POLLING_INTERVAL_MILLIS + "") + protected final long repairStatusPollIntervalMillis; + + /** + * Default constructor that sets default values + */ + public RepairJobsConfigurationImpl() + { + this(DEFAULT_VALID_REPAIR_STATUS_ATTEMPTS, DEFAULT_REPAIR_STATUS_POLLING_INTERVAL_MILLIS); + } + + /** + * Constructor with parameters for JSON deserialization + * + * @param validRepairStatusAttempts the max retry attempts for the repair job status to be valid + * @param repairStatusPollIntervalMillis the polling interval for repair job status in milliseconds + */ + @JsonCreator + public RepairJobsConfigurationImpl( + @JsonProperty(value = "repair_status_attempts", defaultValue = DEFAULT_VALID_REPAIR_STATUS_ATTEMPTS + "") + int validRepairStatusAttempts, + @JsonProperty(value = "repair_status_polling_interval", defaultValue = DEFAULT_REPAIR_STATUS_POLLING_INTERVAL_MILLIS + "") + long repairStatusPollIntervalMillis) + { + this.validRepairStatusAttempts = validRepairStatusAttempts; + this.repairStatusPollIntervalMillis = repairStatusPollIntervalMillis; + } + + @Override + public int repairStatusMaxAttempts() + { + return validRepairStatusAttempts; + } + + @Override + public MillisecondBoundConfiguration repairPollInterval() + { + return new MillisecondBoundConfiguration(repairStatusPollIntervalMillis, TimeUnit.MILLISECONDS); + } + + +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/ServiceConfigurationImpl.java b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/ServiceConfigurationImpl.java index a924d1009..a0b9c2210 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/ServiceConfigurationImpl.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/ServiceConfigurationImpl.java @@ -35,6 +35,7 @@ import org.apache.cassandra.sidecar.config.CdcConfiguration; import org.apache.cassandra.sidecar.config.CoordinationConfiguration; import org.apache.cassandra.sidecar.config.JmxConfiguration; +import org.apache.cassandra.sidecar.config.RepairJobsConfiguration; import org.apache.cassandra.sidecar.config.SSTableImportConfiguration; import org.apache.cassandra.sidecar.config.SSTableSnapshotConfiguration; import org.apache.cassandra.sidecar.config.SSTableUploadConfiguration; @@ -68,11 +69,13 @@ public class ServiceConfigurationImpl implements ServiceConfiguration public static final MinuteBoundConfiguration DEFAULT_ALLOWABLE_TIME_SKEW = MinuteBoundConfiguration.parse("1h"); private static final String SERVER_VERTICLE_INSTANCES_PROPERTY = "server_verticle_instances"; private static final String OPERATIONAL_JOB_TRACKER_SIZE_PROPERTY = "operations_job_tracker_size"; - private static final String OPERATIONAL_JOB_EXECUTION_MAX_WAIT_TIME_PROPERTY = "operations_job_sync_response_timeout"; + public static final String OPERATIONAL_JOB_EXECUTION_MAX_WAIT_TIME_PROPERTY = "operations_job_execution_max_wait_time"; private static final int DEFAULT_SERVER_VERTICLE_INSTANCES = 1; private static final int DEFAULT_OPERATIONAL_JOB_TRACKER_SIZE = 64; private static final MillisecondBoundConfiguration DEFAULT_OPERATIONAL_JOB_EXECUTION_MAX_WAIT_TIME = MillisecondBoundConfiguration.parse("5s"); + private static final MillisecondBoundConfiguration MAX_OPERATIONAL_JOB_EXECUTION_MAX_WAIT_TIME = + MillisecondBoundConfiguration.parse("1m"); public static final String THROTTLE_PROPERTY = "throttle"; public static final String SSTABLE_UPLOAD_PROPERTY = "sstable_upload"; public static final String SSTABLE_IMPORT_PROPERTY = "sstable_import"; @@ -80,6 +83,7 @@ public class ServiceConfigurationImpl implements ServiceConfiguration public static final String WORKER_POOLS_PROPERTY = "worker_pools"; private static final String JMX_PROPERTY = "jmx"; private static final String TRAFFIC_SHAPING_PROPERTY = "traffic_shaping"; + private static final String REPAIR_PROPERTY = "repair"; private static final String SCHEMA = "schema"; private static final String CDC = "cdc"; private static final String COORDINATION = "coordination"; @@ -119,8 +123,7 @@ public class ServiceConfigurationImpl implements ServiceConfiguration @JsonProperty(value = OPERATIONAL_JOB_TRACKER_SIZE_PROPERTY, defaultValue = DEFAULT_OPERATIONAL_JOB_TRACKER_SIZE + "") protected final int operationalJobTrackerSize; - @JsonProperty(value = OPERATIONAL_JOB_EXECUTION_MAX_WAIT_TIME_PROPERTY) - protected final MillisecondBoundConfiguration operationalJobExecutionMaxWaitTime; + protected MillisecondBoundConfiguration operationalJobExecutionMaxWaitTime; @JsonProperty(value = THROTTLE_PROPERTY) protected final ThrottleConfiguration throttleConfiguration; @@ -143,6 +146,9 @@ public class ServiceConfigurationImpl implements ServiceConfiguration @JsonProperty(value = TRAFFIC_SHAPING_PROPERTY) protected final TrafficShapingConfiguration trafficShapingConfiguration; + @JsonProperty(value = REPAIR_PROPERTY) + protected final RepairJobsConfiguration repairJobsConfiguration; + @JsonProperty(value = SCHEMA) protected final SchemaKeyspaceConfiguration schemaKeyspaceConfiguration; @@ -180,6 +186,7 @@ protected ServiceConfigurationImpl(Builder builder) serverVerticleInstances = builder.serverVerticleInstances; operationalJobTrackerSize = builder.operationalJobTrackerSize; operationalJobExecutionMaxWaitTime = builder.operationalJobExecutionMaxWaitTime; + repairJobsConfiguration = builder.repairJobsConfiguration; throttleConfiguration = builder.throttleConfiguration; sstableUploadConfiguration = builder.sstableUploadConfiguration; sstableImportConfiguration = builder.sstableImportConfiguration; @@ -352,12 +359,24 @@ public int operationalJobTrackerSize() * {@inheritDoc} */ @Override - @JsonProperty(value = OPERATIONAL_JOB_EXECUTION_MAX_WAIT_TIME_PROPERTY) public MillisecondBoundConfiguration operationalJobExecutionMaxWaitTime() { return operationalJobExecutionMaxWaitTime; } + @JsonProperty(value = OPERATIONAL_JOB_EXECUTION_MAX_WAIT_TIME_PROPERTY) + public void setOperationalJobExecutionMaxWaitTime(MillisecondBoundConfiguration operationalJobExecutionMaxWaitTime) + { + if (operationalJobExecutionMaxWaitTime.compareTo(MAX_OPERATIONAL_JOB_EXECUTION_MAX_WAIT_TIME) > 0) + { + throw new ConfigurationException(String.format("Invalid %s value (%s). The maximum allowed value is %s.", + OPERATIONAL_JOB_EXECUTION_MAX_WAIT_TIME_PROPERTY, + operationalJobExecutionMaxWaitTime, + MAX_OPERATIONAL_JOB_EXECUTION_MAX_WAIT_TIME)); + } + this.operationalJobExecutionMaxWaitTime = operationalJobExecutionMaxWaitTime; + } + /** * {@inheritDoc} */ @@ -428,6 +447,16 @@ public TrafficShapingConfiguration trafficShapingConfiguration() return trafficShapingConfiguration; } + /** + * {@inheritDoc} + */ + @Override + @JsonProperty(value = REPAIR_PROPERTY) + public RepairJobsConfiguration repairConfiguration() + { + return repairJobsConfiguration; + } + /** * {@inheritDoc} */ @@ -483,6 +512,7 @@ public static class Builder implements DataObjectBuilder b.schemaKeyspaceConfiguration = schemaKeyspaceConfiguration); } + /** + * Sets the {@code repairConfiguration} and returns a reference to this Builder enabling method + * chaining. + * + * @param repairJobsConfiguration the {@code repairConfiguration} to set + * @return a reference to this Builder + */ + public Builder repairConfiguration(RepairJobsConfiguration repairJobsConfiguration) + { + return update(b -> b.repairJobsConfiguration = repairJobsConfiguration); + } + /** * Set the {@code cdcConfiguration} and returns a reference to this Builder enabling * method chaining. diff --git a/server/src/main/java/org/apache/cassandra/sidecar/handlers/RepairHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/handlers/RepairHandler.java new file mode 100644 index 000000000..17d595aa0 --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/handlers/RepairHandler.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers; + +import java.math.BigInteger; +import java.util.Collections; +import java.util.Set; + +import com.datastax.driver.core.utils.UUIDs; +import com.google.inject.Inject; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.json.DecodeException; +import io.vertx.core.json.Json; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions; +import org.apache.cassandra.sidecar.common.request.data.RepairPayload; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.common.server.data.Name; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.config.ServiceConfiguration; +import org.apache.cassandra.sidecar.handlers.data.RepairRequestParam; +import org.apache.cassandra.sidecar.job.OperationalJobManager; +import org.apache.cassandra.sidecar.job.RepairJob; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.apache.cassandra.sidecar.utils.OperationalJobUtils; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Handler for triggering repair + */ +public class RepairHandler extends AbstractHandler implements AccessProtected +{ + private final ServiceConfiguration config; + private final OperationalJobManager jobManager; + + /** + * Constructs a handler with the provided {@code metadataFetcher} + * + * @param metadataFetcher the metadata fetcher + * @param executorPools executor pools for blocking executions + * @param serviceConfiguration configuration object holding config details of Sidecar + * @param validator a validator instance to validate Cassandra-specific input + * @param jobManager manager for long-running operational jobs + */ + @Inject + protected RepairHandler(InstanceMetadataFetcher metadataFetcher, + ExecutorPools executorPools, + ServiceConfiguration serviceConfiguration, + CassandraInputValidator validator, + OperationalJobManager jobManager) + { + super(metadataFetcher, executorPools, validator); + this.jobManager = jobManager; + this.config = serviceConfiguration; + } + + /** + * {@inheritDoc} + */ + @Override + protected RepairRequestParam extractParamsOrThrow(RoutingContext context) + { + Name keyspace = keyspace(context, true); + if (keyspace == null) + { + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "'keyspace' is required but not supplied"); + } + + String bodyString = context.body().asString(); + if (bodyString == null || bodyString.equalsIgnoreCase("null")) // json encoder writes null as "null" + { + logger.warn("Bad request to create repair job. Received null payload."); + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Unexpected null payload for request"); + } + + RepairPayload payload; + try + { + payload = Json.decodeValue(bodyString, RepairPayload.class); + } + catch (DecodeException decodeException) + { + logger.warn("Bad request to create repair job. Received invalid JSON payload."); + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, + "Invalid request payload", + decodeException); + } + + // Validate token range if provided + validateTokenRange(payload); + + return RepairRequestParam.from(keyspace, payload); + } + + @Override + protected void handleInternal(RoutingContext context, + HttpServerRequest httpRequest, + @NotNull String host, + SocketAddress remoteAddress, + RepairRequestParam repairRequestParam) + { + StorageOperations operations = metadataFetcher.delegate(host).storageOperations(); + RepairJob job = new RepairJob(executorPools.internal(), config.repairConfiguration(), UUIDs.timeBased(), operations, repairRequestParam); + + this.jobManager.trySubmitJob(job, + (completedJob, exception) -> + OperationalJobUtils.sendStatusBasedResponse(context, completedJob, exception), + executorPools.service(), + config.operationalJobExecutionMaxWaitTime()); + } + + /** + * Validates the token range in the repair payload if both start and end tokens are provided. + * + * @param payload the repair payload to validate + * @throws HttpException if the token range is invalid + */ + private void validateTokenRange(RepairPayload payload) + { + if (payload.startToken() != null && payload.endToken() != null) + { + try + { + String startTokenStr = payload.startToken(); + String endTokenStr = payload.endToken(); + + // Validate tokens using BigInteger for proper numeric comparison + BigInteger startToken = new BigInteger(startTokenStr); + BigInteger endToken = new BigInteger(endTokenStr); + + if (startToken.compareTo(endToken) >= 0) + { + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, + "Start token must be less than end token. Got start: " + + startTokenStr + ", end: " + endTokenStr); + } + } + catch (NumberFormatException e) + { + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, + "Invalid token format. Tokens must be numeric values.", e); + } + } + } + + /** + * {@inheritDoc} + */ + @Override + public Set requiredAuthorizations() + { + return Collections.singleton(BasicPermissions.REPAIR.toAuthorization()); + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/handlers/data/RepairRequestParam.java b/server/src/main/java/org/apache/cassandra/sidecar/handlers/data/RepairRequestParam.java new file mode 100644 index 000000000..154df8503 --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/handlers/data/RepairRequestParam.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers.data; + + +import org.apache.cassandra.sidecar.common.request.data.RepairPayload; +import org.apache.cassandra.sidecar.common.server.data.Name; + +/** + * Holder class for the {@link org.apache.cassandra.sidecar.handlers.RepairHandler} + * request parameters + */ +public class RepairRequestParam +{ + private final Name keyspace; + private final RepairPayload requestPayload; + + private RepairRequestParam(Name keyspace, RepairPayload requestPayload) + { + + this.keyspace = keyspace; + this.requestPayload = requestPayload; + } + + public static RepairRequestParam from(Name keyspace, RepairPayload requestPayload) + { + return new RepairRequestParam(keyspace, requestPayload); + } + + /** + * @return the keyspace in Cassandra + */ + public Name keyspace() + { + return keyspace; + } + + /** + * @return the Repair request payload + */ + public RepairPayload requestPayload() + { + return requestPayload; + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/handlers/validations/ValidateKeyspaceExistenceHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/handlers/validations/ValidateKeyspaceExistenceHandler.java new file mode 100644 index 000000000..0a3fcf7bd --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/handlers/validations/ValidateKeyspaceExistenceHandler.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers.validations; + +import com.datastax.driver.core.KeyspaceMetadata; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.common.server.data.Name; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.handlers.AbstractHandler; +import org.apache.cassandra.sidecar.routes.RoutingContextUtils; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Validates that the requested keyspace exists in Cassandra, when the endpoint + * contains keyspace name. + * On successful validation, it stores the fetched {@link KeyspaceMetadata} + * in the {@link RoutingContext} + */ +@Singleton +public class ValidateKeyspaceExistenceHandler extends AbstractHandler +{ + @Inject + public ValidateKeyspaceExistenceHandler(InstanceMetadataFetcher metadataFetcher, + ExecutorPools executorPools, + CassandraInputValidator validator) + { + super(metadataFetcher, executorPools, validator); + } + + @Override + protected Name extractParamsOrThrow(RoutingContext context) + { + return keyspace(context, false); + } + + @Override + protected void handleInternal(RoutingContext context, + HttpServerRequest httpRequest, + @NotNull String host, + SocketAddress remoteAddress, + Name keyspace) + { + if (keyspace == null) + { + context.fail(wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Keyspace parameter is required but not provided")); + return; + } + + ValidationUtils.requireKeyspaceExists(metadataFetcher, executorPools, host, keyspace.maybeQuotedName()) + .onComplete(ar -> { + if (ar.succeeded()) + { + // Store metadata in context + KeyspaceMetadata metadata = ar.result(); + RoutingContextUtils.put(context, RoutingContextUtils.SC_KEYSPACE_METADATA, metadata); + context.next(); + } + else + { + // Handle failure + if (ar.cause().getMessage().contains("not found")) + { + context.fail(wrapHttpException(HttpResponseStatus.NOT_FOUND, ar.cause().getMessage())); + } + else + { + context.fail(ar.cause()); + } + } + }); + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/handlers/validations/ValidateTableExistenceHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/handlers/validations/ValidateTableExistenceHandler.java index 4c0894813..c0c206df0 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/handlers/validations/ValidateTableExistenceHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/handlers/validations/ValidateTableExistenceHandler.java @@ -23,7 +23,6 @@ import com.google.inject.Inject; import com.google.inject.Singleton; import io.netty.handler.codec.http.HttpResponseStatus; -import io.vertx.core.Future; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.net.SocketAddress; import io.vertx.ext.web.RoutingContext; @@ -76,16 +75,24 @@ protected void handleInternal(RoutingContext context, return; } - getKeyspaceMetadata(host, input.maybeQuotedKeyspace()) - .onFailure(context::fail) // fail the request with the internal server error thrown from getKeyspaceMetadata - .onSuccess(keyspaceMetadata -> { - if (keyspaceMetadata == null) + ValidationUtils.requireKeyspaceExists(metadataFetcher, executorPools, host, input.maybeQuotedKeyspace()) + .onComplete(ar -> { + if (ar.failed()) { - context.fail(wrapHttpException(HttpResponseStatus.NOT_FOUND, - "Keyspace " + input.keyspace() + " was not found")); + // Handle failure + if (ar.cause().getMessage().contains("not found")) + { + context.fail(wrapHttpException(HttpResponseStatus.NOT_FOUND, ar.cause().getMessage())); + } + else + { + context.fail(ar.cause()); + } return; } - + + // Store metadata in context + KeyspaceMetadata keyspaceMetadata = ar.result(); RoutingContextUtils.put(context, RoutingContextUtils.SC_KEYSPACE_METADATA, keyspaceMetadata); String table = input.maybeQuotedTableName(); @@ -95,26 +102,25 @@ protected void handleInternal(RoutingContext context, return; } - TableMetadata tableMetadata = keyspaceMetadata.getTable(table); - if (tableMetadata == null) + try { - String errMsg = "Table " + input.tableName() + " was not found for keyspace " + input.keyspace(); - context.fail(wrapHttpException(HttpResponseStatus.NOT_FOUND, errMsg)); + TableMetadata tableMetadata = keyspaceMetadata.getTable(table); + if (tableMetadata == null) + { + String errMsg = "Table " + input.tableName() + " was not found for keyspace " + input.keyspace(); + context.fail(wrapHttpException(HttpResponseStatus.NOT_FOUND, errMsg)); + } + else + { + RoutingContextUtils.put(context, RoutingContextUtils.SC_TABLE_METADATA, tableMetadata); + context.next(); + } } - else + catch (Exception e) { - RoutingContextUtils.put(context, RoutingContextUtils.SC_TABLE_METADATA, tableMetadata); - // keyspace / [table] exists - context.next(); + context.fail(wrapHttpException(HttpResponseStatus.INTERNAL_SERVER_ERROR, + "Error validating table existence", e)); } }); } - - private Future getKeyspaceMetadata(String host, String keyspace) - { - return executorPools.service().executeBlocking(() -> metadataFetcher.instance(host) - .delegate() - .metadata() - .getKeyspace(keyspace)); - } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/handlers/validations/ValidationUtils.java b/server/src/main/java/org/apache/cassandra/sidecar/handlers/validations/ValidationUtils.java new file mode 100644 index 000000000..857dd3ec6 --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/handlers/validations/ValidationUtils.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers.validations; + +import com.datastax.driver.core.KeyspaceMetadata; +import io.vertx.core.Future; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.exceptions.KeyspaceNotFoundException; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; + +/** + * Utility class for validation handlers that check the existence of Cassandra schema elements. + */ +public class ValidationUtils +{ + private ValidationUtils() + { + // Utility class, not meant to be instantiated + } + + /** + * Fetches keyspace metadata from Cassandra for the given host and keyspace name. + * + * @param metadataFetcher the metadata fetcher + * @param executorPools the executor pools + * @param host the host to fetch metadata from + * @param keyspace the keyspace name + * @return a Future containing the KeyspaceMetadata, or null if the keyspace doesn't exist + */ + public static Future getKeyspaceMetadata(InstanceMetadataFetcher metadataFetcher, + ExecutorPools executorPools, + String host, + String keyspace) + { + return executorPools.service().executeBlocking(() -> metadataFetcher.instance(host) + .delegate() + .metadata() + .getKeyspace(keyspace)); + } + + /** + * Validates that a keyspace exists. + * + * @param metadataFetcher the metadata fetcher + * @param executorPools the executor pools + * @param host the host to validate against + * @param keyspace the keyspace name to validate + * @return a Future that completes with the KeyspaceMetadata if the keyspace exists, + * or fails with an error if the keyspace doesn't exist or an error occurs + */ + public static Future requireKeyspaceExists(InstanceMetadataFetcher metadataFetcher, + ExecutorPools executorPools, + String host, + String keyspace) + { + return getKeyspaceMetadata(metadataFetcher, executorPools, host, keyspace) + .compose(keyspaceMetadata -> { + if (keyspaceMetadata == null) + { + return Future.failedFuture(new KeyspaceNotFoundException(keyspace)); + } + else + { + return Future.succeededFuture(keyspaceMetadata); + } + }); + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/job/NodeDecommissionJob.java b/server/src/main/java/org/apache/cassandra/sidecar/job/NodeDecommissionJob.java index 32a05fa96..515516e74 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/job/NodeDecommissionJob.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/job/NodeDecommissionJob.java @@ -18,13 +18,17 @@ package org.apache.cassandra.sidecar.job; +import java.util.Collections; +import java.util.List; import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.vertx.core.Future; import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.jetbrains.annotations.NotNull; /** * Implementation of {@link OperationalJob} to perform node decommission operation. @@ -43,8 +47,14 @@ public NodeDecommissionJob(UUID jobId, StorageOperations storageOps, boolean isF this.isForce = isForce; } + /** + * Node Decommission job determines conflict based on the current operation mode of the cluster. + * If the cluster is in the middle of a decommission or leaving operation, the job will be rejected. + * @param sameOperationJobs jobs tracked by the tracker are not relevant for this conflict check + * @return true if the job has a conflict, false otherwise + */ @Override - public boolean isRunningOnCassandra() + public boolean hasConflict(@NotNull List sameOperationJobs) { String operationMode = storageOperations.operationMode(); return "LEAVING".equals(operationMode) || "DECOMMISSIONED".equals(operationMode); @@ -76,16 +86,17 @@ else if ("DECOMMISSIONED".equals(operationMode)) * {@inheritDoc} */ @Override - protected void executeInternal() + protected Future executeInternal() { - if (isRunningOnCassandra()) + if (hasConflict(Collections.emptyList())) { LOGGER.info("Not executing job as an ongoing or completed decommission operation was found jobId={}", this.jobId()); - return; + return Future.succeededFuture(); } LOGGER.info("Executing decommission operation. jobId={}", this.jobId()); storageOperations.decommission(isForce); + return Future.succeededFuture(); } /** diff --git a/server/src/main/java/org/apache/cassandra/sidecar/job/NodeDrainJob.java b/server/src/main/java/org/apache/cassandra/sidecar/job/NodeDrainJob.java index d64857646..4f8f6abdb 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/job/NodeDrainJob.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/job/NodeDrainJob.java @@ -19,15 +19,19 @@ package org.apache.cassandra.sidecar.job; import java.io.IOException; +import java.util.Collections; +import java.util.List; import java.util.UUID; import java.util.concurrent.ExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.vertx.core.Future; import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; import org.apache.cassandra.sidecar.common.server.StorageOperations; import org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException; +import org.jetbrains.annotations.NotNull; /** * Implementation of {@link OperationalJob} to perform node drain operation. @@ -84,7 +88,7 @@ public NodeDrainJob(UUID jobId, StorageOperations storageOps) * {@inheritDoc} */ @Override - public boolean isRunningOnCassandra() + public boolean hasConflict(@NotNull List sameOperationJobs) { String operationMode = storageOperations.operationMode(); NodeDrainStateEnum nodeDrainStateEnum = NodeDrainStateEnum.fromOperationMode(operationMode); @@ -111,12 +115,12 @@ public OperationalJobStatus status() * {@inheritDoc} */ @Override - protected void executeInternal() + protected Future executeInternal() { - if (isRunningOnCassandra()) + if (hasConflict(Collections.emptyList())) { LOGGER.info("Not executing job as an ongoing drain operation was found jobId={}", this.jobId()); - return; + return Future.succeededFuture(); } LOGGER.info("Executing drain operation. jobId={}", this.jobId()); @@ -129,6 +133,8 @@ protected void executeInternal() LOGGER.error("Job execution failed. jobId={} reason='{}'", this.jobId(), ex.getMessage(), ex); throw OperationalJobException.wraps(ex); } + + return Future.succeededFuture(); } /** diff --git a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java index c4b6b9a34..342da85b2 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java @@ -18,6 +18,7 @@ package org.apache.cassandra.sidecar.job; +import java.util.List; import java.util.UUID; import org.slf4j.Logger; @@ -32,6 +33,7 @@ import org.apache.cassandra.sidecar.common.utils.Preconditions; import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool; import org.apache.cassandra.sidecar.tasks.Task; +import org.jetbrains.annotations.NotNull; /** * An abstract class representing operational jobs that run on Cassandra @@ -100,11 +102,12 @@ public boolean isStale(long referenceTimestampInMillis, long ttlInMillis) } /** - * The concrete-job-specific implementation to determine if the job is running on the Cassandra node. - * @return true if the job is running on the Cassandra node. For example, node decommission is tracked by the - * operationMode exposed from Cassandra. + * The concrete-job-specific implementation to determine if the job has conflict with other operations on the same node. + * + * @param sameOperationJobs list of jobs being tracked by the tracker that have the same operation + * @return true if the job is has a conflict. */ - public abstract boolean isRunningOnCassandra(); + public abstract boolean hasConflict(@NotNull List sameOperationJobs); /** * Determines the status of the job. OperationalJob subclasses could choose to override the method. @@ -167,6 +170,8 @@ public Future asyncResult(TaskExecutorPool executorPool, DurationSpec wait // complete the max wait time promise either when exceeding the wait time, or the result is available Promise maxWaitTimePromise = Promise.promise(); + + // Pool's setTimer executes the handler on the pool's thread, but the timer is still managed by vertx eventloop executorPool.setTimer(waitTime.toMillis(), d -> maxWaitTimePromise.tryComplete(true)); // complete with true, meaning timeout resultFut.onComplete(res -> maxWaitTimePromise.tryComplete(false)); // complete with false, meaning not timeout Future maxWaitTimeFut = maxWaitTimePromise.future(); @@ -187,9 +192,9 @@ public Future asyncResult(TaskExecutorPool executorPool, DurationSpec wait } /** - * OperationalJob body. The implementation is executed in a blocking manner. + * OperationalJob body. The implementation returns a Future representing the job execution. */ - protected abstract void executeInternal(); + protected abstract Future executeInternal(); /** * Execute the job behavior as specified in the internal execution {@link #executeInternal()}, @@ -200,21 +205,27 @@ public void execute(Promise promise) { isExecuting = true; LOGGER.info("Executing job. jobId={}", jobId); - promise.future().onComplete(executionPromise); + promise.future().onComplete(executionPromise); try { - // Blocking call to perform concrete job-specific execution, returning the status - executeInternal(); - promise.tryComplete(); - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Complete job execution. jobId={} status={}", jobId, status()); - } + Future internalFuture = executeInternal(); + internalFuture.onComplete(ar -> { + if (ar.succeeded()) + { + promise.tryComplete(); + LOGGER.info("Complete job execution. jobId={} status={}", jobId, status()); + } + else + { + promise.tryFail(ar.cause()); + LOGGER.error("Job execution failed. jobId={} reason={}", jobId, ar.cause().getMessage()); + } + }); } catch (Throwable e) { OperationalJobException oje = OperationalJobException.wraps(e); - LOGGER.error("Job execution failed. jobId={} reason='{}'", jobId, oje.getMessage(), oje); + LOGGER.error("Job execution failed. jobId={} reason={}", jobId, oje.getMessage(), oje); promise.tryFail(oje); } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java index 6d36f353d..073a8ecb2 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java @@ -33,6 +33,7 @@ import org.apache.cassandra.sidecar.concurrent.ExecutorPools; import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool; import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException; +import org.jetbrains.annotations.NotNull; /** * An abstraction of the management and tracking of long-running jobs running on the sidecar. @@ -126,9 +127,8 @@ public void trySubmitJob(OperationalJob job, */ private void checkConflict(OperationalJob job) throws OperationalJobConflictException { - // If there are no tracked running jobs for same operation, then we confirm downstream - // Downstream check is done in most cases - by design - if (!jobTracker.inflightJobsByOperation(job.name()).isEmpty() || job.isRunningOnCassandra()) + @NotNull List sameOperationJobs = jobTracker.inflightJobsByOperation(job.name()); + if (job.hasConflict(sameOperationJobs)) { throw new OperationalJobConflictException("The same operational job is already running on Cassandra. operationName='" + job.name() + '\''); } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/job/RepairJob.java b/server/src/main/java/org/apache/cassandra/sidecar/job/RepairJob.java new file mode 100644 index 000000000..b4b3877b1 --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/job/RepairJob.java @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.job; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.vertx.core.Future; +import io.vertx.core.Promise; +import org.apache.cassandra.sidecar.adapters.base.RepairOptions; +import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; +import org.apache.cassandra.sidecar.common.request.data.RepairPayload; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool; +import org.apache.cassandra.sidecar.config.RepairJobsConfiguration; +import org.apache.cassandra.sidecar.handlers.data.RepairRequestParam; +import org.jetbrains.annotations.NotNull; + +/** + * Implementation of {@link OperationalJob} to perform repair operation. + *

+ * Cassandra repair operations are asynchronous and can run for extended periods. When a repair is + * detected to be IN_PROGRESS, this job sets its status to RUNNING and completes its promise. + *

+ * The job's status can be checked at any time using the {@link #status()} method, which directly + * queries the current repair status from Cassandra, regardless of whether the promise has been completed. + *

+ * This design ensures timely responses to clients while allowing accurate status reporting for + * long-running repair operations. + */ +public class RepairJob extends OperationalJob +{ + private static final Logger LOGGER = LoggerFactory.getLogger(RepairJob.class); + private static final String OPERATION = "repair"; + private static final String PREVIEW_KIND_REPAIRED = "REPAIRED"; + + private final RepairRequestParam repairParams; + private final RepairJobsConfiguration config; + private final TaskExecutorPool internalPool; + protected StorageOperations storageOperations; + private volatile OperationalJobStatus currentStatus; + private volatile int commandId = -1; // Store the command ID for status checks + + /** + * Enum representing the status of a parent repair session + */ + public enum ParentRepairStatus + { + IN_PROGRESS, COMPLETED, FAILED + } + + /** + * Constructor for creation of RepairJob + * + * @param taskExecutorPool TaskExecutorPool instance (for testing) + * @param config Repair job configuration + * @param jobId UUID representing the Job to be created + * @param storageOps Reference to the storage operations interface + * @param repairParams Repair request parameters + */ + public RepairJob(TaskExecutorPool taskExecutorPool, + RepairJobsConfiguration config, + UUID jobId, + StorageOperations storageOps, + RepairRequestParam repairParams) + { + super(jobId); + this.internalPool = taskExecutorPool; + this.config = config; + this.storageOperations = storageOps; + this.repairParams = repairParams; + this.currentStatus = OperationalJobStatus.CREATED; + } + + /** + * {@inheritDoc} + * RepairJob allows parallel executions since multiple repairs can run concurrently + * on different tables or with different parameters. + */ + @Override + public boolean hasConflict(@NotNull List sameOperationJobs) + { + // For now, we simply allow all repair jobs to run in parallel + // A more sophisticated implementation could check the repair parameters + // against currently running repairs to detect potential conflicts + return false; + } + + @Override + protected Future executeInternal() + { + try + { + Map options = generateRepairOptions(repairParams.requestPayload()); + String keyspace = repairParams.keyspace().name(); + + LOGGER.info("Executing repair operation for keyspace {} jobId={}", keyspace, this.jobId()); + + try + { + commandId = storageOperations.repair(keyspace, options); + } + catch (Exception e) + { + LOGGER.error("Failed to initiate repair for keyspace {} jobId={}", keyspace, this.jobId(), e); + currentStatus = OperationalJobStatus.FAILED; + return Future.failedFuture(e); + } + + if (commandId <= 0) + { + // When there are no relevant token ranges for the keyspace or RF is 1, the repair is inapplicable + LOGGER.info("Repair is not applicable for the provided options and keyspace '{}' jobId '{}'", keyspace, this.jobId()); + currentStatus = OperationalJobStatus.SUCCEEDED; + return Future.succeededFuture(); + } + + // Create promise for job submission response - this completes when we have a definitive + // status to return to the client (either repair is running, completed, or failed) + final Promise jobSubmissionPromise = Promise.promise(); + int maxAttempts = config.repairStatusMaxAttempts(); + final AtomicInteger attemptCounter = new AtomicInteger(0); + + // Periodic timer that checks for a valid repair status for a specified no. attempts to make a best-effort attempt + // to validate that the repair has been kicked-off before returning. + internalPool.setPeriodic(0, config.repairPollInterval().toIntMillis(), id -> { + try + { + int currentAttempt = attemptCounter.incrementAndGet(); + if (currentAttempt > maxAttempts) + { + internalPool.cancelTimer(id); + String msg = String.format("Failed to obtain repair status after %d attempts.", maxAttempts); + LOGGER.warn(msg); + // Set status to RUNNING and complete the promise to ensure the job is properly handled + currentStatus = OperationalJobStatus.RUNNING; + jobSubmissionPromise.tryComplete(); + return; + } + + // Check the repair status + List status; + try + { + status = storageOperations.getParentRepairStatus(commandId); + } + catch (Exception e) + { + LOGGER.warn("Failed to get repair status for cmd: {} (attempt {}/{})", + commandId, currentAttempt, maxAttempts, e); + // Continue polling on exception + return; + } + + // If status is empty, continue polling + if (status == null || status.isEmpty()) + { + LOGGER.debug("No parent repair session status found for cmd: {} - repair may be initializing (attempt {}/{})", + commandId, currentAttempt, maxAttempts); + return; + } + + internalPool.cancelTimer(id); + updateRepairJobStatus(jobSubmissionPromise, status); + } + catch (Exception e) + { + LOGGER.error("Unexpected error in repair status check", e); + internalPool.cancelTimer(id); + currentStatus = OperationalJobStatus.FAILED; + jobSubmissionPromise.tryFail(e); + } + }); + + return jobSubmissionPromise.future(); + } + catch (Exception e) + { + // Catch any exceptions in the overall method + LOGGER.error("Failed to execute repair job", e); + currentStatus = OperationalJobStatus.FAILED; + return Future.failedFuture(e); + } + } + + + @Override + public OperationalJobStatus status() + { + // If we have a valid command ID, check the actual repair status + if (commandId > 0) + { + List status = storageOperations.getParentRepairStatus(commandId); + if (status != null && !status.isEmpty()) + { + try + { + ParentRepairStatus parentRepairStatus = ParentRepairStatus.valueOf(status.get(0)); + switch (parentRepairStatus) + { + case COMPLETED: + return OperationalJobStatus.SUCCEEDED; + case FAILED: + return OperationalJobStatus.FAILED; + case IN_PROGRESS: + return OperationalJobStatus.RUNNING; + default: + LOGGER.warn("Encountered unexpected repair status: {}", parentRepairStatus); + // Don't update currentStatus here, fall back to parent implementation + } + } + catch (IllegalArgumentException e) + { + LOGGER.warn("Invalid parent repair status: {}", status.get(0), e); + // Don't update currentStatus here, fall back to parent implementation + } + } + } + + // If we have a current status, return it + if (currentStatus != null) + { + return currentStatus; + } + + // Otherwise, fall back to the parent implementation + return super.status(); + } + + /** + * Updates the repair job status based on the parent repair status from Cassandra. + *

+ * When the parent repair status is IN_PROGRESS, this method sets the currentStatus + * to RUNNING and completes the promise + *

+ * This approach ensures that resources are properly managed while still providing + * accurate status reporting through the {@link #status()} method. + * + * @param jobSubmissionPromise the promise to complete + * @param status the parent repair status from Cassandra + */ + private void updateRepairJobStatus(Promise jobSubmissionPromise, List status) + { + ParentRepairStatus parentRepairStatus = ParentRepairStatus.valueOf(status.get(0)); + List messages = status.subList(1, status.size()); + + LOGGER.info("Parent repair session {} has {} status. Messages: {}", + parentRepairStatus.name().toLowerCase(), + parentRepairStatus, + String.join("\n", messages)); + switch (parentRepairStatus) + { + case COMPLETED: + currentStatus = OperationalJobStatus.SUCCEEDED; + jobSubmissionPromise.tryComplete(); + break; + case FAILED: + currentStatus = OperationalJobStatus.FAILED; + String reason = !messages.isEmpty() ? messages.get(0) : + "Repair failed with no error message"; + jobSubmissionPromise.tryFail(new IOException(reason)); + break; + case IN_PROGRESS: + currentStatus = OperationalJobStatus.RUNNING; + jobSubmissionPromise.tryComplete(); + break; + default: + String message = String.format("Encountered unexpected repair status: %s Messages: %s", + parentRepairStatus, String.join("\n", messages)); + LOGGER.error(message); + currentStatus = OperationalJobStatus.FAILED; + jobSubmissionPromise.tryFail(message); + break; + } + } + + /** + * {@inheritDoc} + */ + @Override + public String name() + { + return OPERATION; + } + + private Map generateRepairOptions(RepairPayload repairPayload) + { + Map options = new HashMap<>(); + + List tables = repairPayload.tables(); + if (tables != null && !tables.isEmpty()) + { + options.put(RepairOptions.COLUMNFAMILIES.optionName(), String.join(",", tables)); + } + + Boolean isPrimaryRange = repairPayload.isPrimaryRange(); + if (isPrimaryRange != null) + { + options.put(RepairOptions.PRIMARY_RANGE.optionName(), String.valueOf(isPrimaryRange)); + } + // TODO: Verify use-cases involving multiple DCs + + String dc = repairPayload.datacenter(); + if (dc != null) + { + options.put(RepairOptions.DATACENTERS.optionName(), dc); + } + + List hosts = repairPayload.hosts(); + if (hosts != null && !hosts.isEmpty()) + { + options.put(RepairOptions.HOSTS.optionName(), String.join(",", hosts)); + } + + if (repairPayload.startToken() != null && repairPayload.endToken() != null) + { + // Use original string values for range construction + options.put(RepairOptions.RANGES.optionName(), repairPayload.startToken() + ":" + repairPayload.endToken()); + } + + if (repairPayload.repairType() == RepairPayload.RepairType.INCREMENTAL) + { + options.put(RepairOptions.INCREMENTAL.optionName(), Boolean.TRUE.toString()); + } + + if (repairPayload.force() != null) + { + options.put(RepairOptions.FORCE_REPAIR.optionName(), String.valueOf(repairPayload.force())); + } + + if (repairPayload.shouldValidate() != null) + { + options.put(RepairOptions.PREVIEW.optionName(), PREVIEW_KIND_REPAIRED); + } + return options; + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java b/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java index ebf94af94..78083f70c 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java @@ -49,6 +49,7 @@ import org.apache.cassandra.sidecar.handlers.NodeDecommissionHandler; import org.apache.cassandra.sidecar.handlers.NodeDrainHandler; import org.apache.cassandra.sidecar.handlers.OperationalJobHandler; +import org.apache.cassandra.sidecar.handlers.RepairHandler; import org.apache.cassandra.sidecar.handlers.RingHandler; import org.apache.cassandra.sidecar.handlers.SchemaHandler; import org.apache.cassandra.sidecar.handlers.StreamStatsHandler; @@ -56,6 +57,7 @@ import org.apache.cassandra.sidecar.handlers.TokenRangeReplicaMapHandler; import org.apache.cassandra.sidecar.handlers.cassandra.NodeSettingsHandler; import org.apache.cassandra.sidecar.handlers.v2.cassandra.V2NodeSettingsHandler; +import org.apache.cassandra.sidecar.handlers.validations.ValidateKeyspaceExistenceHandler; import org.apache.cassandra.sidecar.handlers.validations.ValidateTableExistenceHandler; import org.apache.cassandra.sidecar.modules.multibindings.KeyClassMapKey; import org.apache.cassandra.sidecar.modules.multibindings.TableSchemaMapKeys; @@ -180,14 +182,35 @@ VertxRoute cassandraNodeDrainRoute(RouteBuilder.Factory factory, return factory.buildRouteWithHandler(nodeDrainHandler); } + @GET + @Path(ApiEndpointsV1.REPAIR_ROUTE) + @Operation(summary = "Trigger a repair operation", + description = "Returns the status of the repair operation on the Cassandra node") + @APIResponse(description = "Repair operation status retrieved successfully", + responseCode = "200", + content = @Content(mediaType = "application/json", + schema = @Schema(implementation = OperationalJobResponse.class))) + @ProvidesIntoMap + @KeyClassMapKey(VertxRouteMapKeys.CassandraRepairRouteKey.class) + VertxRoute cassandraRepairRoute(RouteBuilder.Factory factory, + ValidateKeyspaceExistenceHandler validateKeyspaceExistence, + RepairHandler repairhandler) + { + return factory.builderForRoute() + .setBodyHandler(true) + .handler(validateKeyspaceExistence) + .handler(repairhandler) + .build(); + } + @GET @Path(ApiEndpointsV1.STREAM_STATS_ROUTE) @Operation(summary = "Get stream statistics", - description = "Returns streaming statistics for the Cassandra node") + description = "Returns streaming statistics for the Cassandra node") @APIResponse(description = "Stream statistics retrieved successfully", - responseCode = "200", - content = @Content(mediaType = "application/json", - schema = @Schema(implementation = StreamStatsResponse.class))) + responseCode = "200", + content = @Content(mediaType = "application/json", + schema = @Schema(implementation = StreamStatsResponse.class))) @ProvidesIntoMap @KeyClassMapKey(VertxRouteMapKeys.CassandraStreamStatsRouteKey.class) VertxRoute cassandraStreamStatsRoute(RouteBuilder.Factory factory, diff --git a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java index c798fbde6..f3496248f 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java @@ -89,6 +89,13 @@ interface CassandraNodeDrainRouteKey extends RouteClassKey HttpMethod HTTP_METHOD = HttpMethod.PUT; String ROUTE_URI = ApiEndpointsV1.NODE_DRAIN_ROUTE; } + + interface CassandraRepairRouteKey extends RouteClassKey + { + HttpMethod HTTP_METHOD = HttpMethod.PUT; + String ROUTE_URI = ApiEndpointsV1.REPAIR_ROUTE; + } + interface CassandraNodeSettingsRouteKey extends RouteClassKey { HttpMethod HTTP_METHOD = HttpMethod.GET; diff --git a/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java b/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java index 7d985e2d4..14c8263bb 100644 --- a/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java +++ b/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java @@ -51,6 +51,7 @@ import org.apache.cassandra.sidecar.config.yaml.KeyStoreConfigurationImpl; import org.apache.cassandra.sidecar.config.yaml.ParameterizedClassConfigurationImpl; import org.apache.cassandra.sidecar.config.yaml.PeriodicTaskConfigurationImpl; +import org.apache.cassandra.sidecar.config.yaml.RepairJobsConfigurationImpl; import org.apache.cassandra.sidecar.config.yaml.SSTableUploadConfigurationImpl; import org.apache.cassandra.sidecar.config.yaml.SchemaKeyspaceConfigurationImpl; import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl; @@ -115,6 +116,7 @@ public SidecarConfiguration configuration(CoordinationConfiguration clusterLease .build()) .coordinationConfiguration(clusterLeaseClaimTaskConfiguration) .sstableUploadConfiguration(new SSTableUploadConfigurationImpl(0F)) + .repairConfiguration(new RepairJobsConfigurationImpl()) .build(); PeriodicTaskConfiguration healthCheckConfiguration = new PeriodicTaskConfigurationImpl(true, diff --git a/server/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java b/server/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java index db83cedda..ee859ebf7 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java @@ -34,7 +34,9 @@ import org.apache.cassandra.sidecar.common.server.dns.DnsResolver; import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration; import org.apache.cassandra.sidecar.config.yaml.MetricsFilteringConfigurationImpl; +import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl; import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl; +import org.apache.cassandra.sidecar.exceptions.ConfigurationException; import org.assertj.core.api.Condition; import static org.apache.cassandra.sidecar.common.ResourceUtils.writeResourceToPath; @@ -800,6 +802,18 @@ void validateMetricsConfiguration(MetricsConfiguration config) assertThat(config.excludeConfigurations()).isNotNull(); } + @Test + void testInvalidOperationalJobExecutionMaxWaitTime() + { + String yaml = "sidecar:\n" + + " " + ServiceConfigurationImpl.OPERATIONAL_JOB_EXECUTION_MAX_WAIT_TIME_PROPERTY + ": 2m"; + assertThatExceptionOfType(JsonMappingException.class) + .isThrownBy(() -> SidecarConfigurationImpl.fromYamlString(yaml)) + .withRootCauseInstanceOf(ConfigurationException.class) + .withMessageContaining("Invalid " + ServiceConfigurationImpl.OPERATIONAL_JOB_EXECUTION_MAX_WAIT_TIME_PROPERTY + + " value (2m). The maximum allowed value is 1m."); + } + private Path yaml(String resourceName) { ClassLoader classLoader = this.getClass().getClassLoader(); diff --git a/server/src/test/java/org/apache/cassandra/sidecar/handlers/CommonTest.java b/server/src/test/java/org/apache/cassandra/sidecar/handlers/CommonTest.java index 9d0a5c338..d742a9e20 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/handlers/CommonTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/handlers/CommonTest.java @@ -40,6 +40,7 @@ import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; import org.apache.cassandra.sidecar.cluster.InstancesMetadata; import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.server.StorageOperations; import org.apache.cassandra.sidecar.modules.SidecarModules; import org.apache.cassandra.sidecar.server.Server; @@ -61,7 +62,7 @@ void before() throws InterruptedException { Injector injector; Module testOverride = Modules.override(new TestModule()) - .with(new CommonTestModule()); + .with(new CommonTestModule(delegate)); injector = Guice.createInjector(Modules.override(SidecarModules.all()) .with(testOverride)); vertx = injector.getInstance(Vertx.class); @@ -84,25 +85,66 @@ void after() throws InterruptedException LOGGER.error("Close event timed out."); } - class CommonTestModule extends AbstractModule + /** + * Common test module to provide default mocks for testing + */ + public static class CommonTestModule extends AbstractModule { + protected static final int DEFAULT_INSTANCE_ID = 100; + protected static final String DEFAULT_HOST = "127.0.0.1"; + protected static final int DEFAULT_PORT = 9042; + + protected CassandraAdapterDelegate delegate; + protected StorageOperations storageOperations; + + /** + * Basic test module with default mocks + */ + public CommonTestModule() + { + this.delegate = mock(CassandraAdapterDelegate.class); + this.storageOperations = mock(StorageOperations.class); + when(delegate.storageOperations()).thenReturn(storageOperations); + } + + /** + * Test module with custom delegate + * + * @param delegate the CassandraAdapterDelegate mock to use + */ + public CommonTestModule(CassandraAdapterDelegate delegate) + { + this.delegate = delegate; + this.storageOperations = delegate.storageOperations(); + } + + /** + * Test module with custom StorageOperations + * + * @param storageOperations the StorageOperations mock to use + */ + public CommonTestModule(StorageOperations storageOperations) + { + this.delegate = mock(CassandraAdapterDelegate.class); + this.storageOperations = storageOperations; + when(delegate.storageOperations()).thenReturn(storageOperations); + } + @Provides @Singleton public InstancesMetadata instanceConfig() { - int instanceId = 100; - String host = "127.0.0.1"; InstanceMetadata instanceMetadata = mock(InstanceMetadata.class); - when(instanceMetadata.host()).thenReturn(host); - when(instanceMetadata.port()).thenReturn(9042); - when(instanceMetadata.id()).thenReturn(instanceId); + when(instanceMetadata.host()).thenReturn(DEFAULT_HOST); + when(instanceMetadata.port()).thenReturn(DEFAULT_PORT); + when(instanceMetadata.id()).thenReturn(DEFAULT_INSTANCE_ID); when(instanceMetadata.stagingDir()).thenReturn(""); when(instanceMetadata.delegate()).thenReturn(delegate); InstancesMetadata mockInstancesMetadata = mock(InstancesMetadata.class); when(mockInstancesMetadata.instances()).thenReturn(Collections.singletonList(instanceMetadata)); - when(mockInstancesMetadata.instanceFromId(instanceId)).thenReturn(instanceMetadata); - when(mockInstancesMetadata.instanceFromHost(host)).thenReturn(instanceMetadata); + when(mockInstancesMetadata.instanceFromId(DEFAULT_INSTANCE_ID)).thenReturn(instanceMetadata); + when(mockInstancesMetadata.instanceFromHost(DEFAULT_HOST)).thenReturn(instanceMetadata); return mockInstancesMetadata; } diff --git a/server/src/test/java/org/apache/cassandra/sidecar/handlers/ListOperationalJobsHandlerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/handlers/ListOperationalJobsHandlerTest.java index bc6f8cad1..5a36bfde6 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/handlers/ListOperationalJobsHandlerTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/handlers/ListOperationalJobsHandlerTest.java @@ -40,6 +40,7 @@ import com.google.inject.Provides; import com.google.inject.Singleton; import com.google.inject.util.Modules; +import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.ext.web.client.WebClient; import io.vertx.ext.web.client.predicate.ResponsePredicate; @@ -150,14 +151,15 @@ protected SampleOperationalJob(UUID jobId) } @Override - public boolean isRunningOnCassandra() + public boolean hasConflict(List jobs) { return false; } @Override - protected void executeInternal() throws OperationalJobException + protected Future executeInternal() throws OperationalJobException { + return Future.succeededFuture(); } } } diff --git a/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeDecommissionHandlerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeDecommissionHandlerTest.java index 70d530bf1..ba4fa48e9 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeDecommissionHandlerTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeDecommissionHandlerTest.java @@ -18,7 +18,6 @@ package org.apache.cassandra.sidecar.handlers; -import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -29,12 +28,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.inject.AbstractModule; import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.Module; -import com.google.inject.Provides; -import com.google.inject.Singleton; import com.google.inject.util.Modules; import io.vertx.core.Vertx; import io.vertx.ext.web.client.WebClient; @@ -42,9 +38,6 @@ import io.vertx.junit5.VertxExtension; import io.vertx.junit5.VertxTestContext; import org.apache.cassandra.sidecar.TestModule; -import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; -import org.apache.cassandra.sidecar.cluster.InstancesMetadata; -import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; import org.apache.cassandra.sidecar.common.response.OperationalJobResponse; import org.apache.cassandra.sidecar.common.server.StorageOperations; import org.apache.cassandra.sidecar.modules.SidecarModules; @@ -78,7 +71,7 @@ void before() throws InterruptedException { Injector injector; Module testOverride = Modules.override(new TestModule()) - .with(new NodeDecommissionHandlerTest.NodeDecommissionTestModule()); + .with(new CommonTest.CommonTestModule(mockStorageOperations)); injector = Guice.createInjector(Modules.override(SidecarModules.all()) .with(testOverride)); vertx = injector.getInstance(Vertx.class); @@ -172,36 +165,4 @@ void testDecommissionConflict(VertxTestContext context) context.completeNow(); })); } - - /** - * Test guice module for Node Decommission handler tests - */ - class NodeDecommissionTestModule extends AbstractModule - { - @Provides - @Singleton - public InstancesMetadata instanceMetadata() - { - final int instanceId = 100; - final String host = "127.0.0.1"; - final InstanceMetadata instanceMetadata = mock(InstanceMetadata.class); - when(instanceMetadata.host()).thenReturn(host); - when(instanceMetadata.port()).thenReturn(9042); - when(instanceMetadata.id()).thenReturn(instanceId); - when(instanceMetadata.stagingDir()).thenReturn(""); - - CassandraAdapterDelegate delegate = mock(CassandraAdapterDelegate.class); - - when(delegate.storageOperations()).thenReturn(mockStorageOperations); - when(instanceMetadata.delegate()).thenReturn(delegate); - - InstancesMetadata mockInstancesMetadata = mock(InstancesMetadata.class); - when(mockInstancesMetadata.instances()).thenReturn(Collections.singletonList(instanceMetadata)); - when(mockInstancesMetadata.instanceFromId(instanceId)).thenReturn(instanceMetadata); - when(mockInstancesMetadata.instanceFromHost(host)).thenReturn(instanceMetadata); - - return mockInstancesMetadata; - } - } - } diff --git a/server/src/test/java/org/apache/cassandra/sidecar/handlers/RepairHandlerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/handlers/RepairHandlerTest.java new file mode 100644 index 000000000..cf5851b76 --- /dev/null +++ b/server/src/test/java/org/apache/cassandra/sidecar/handlers/RepairHandlerTest.java @@ -0,0 +1,464 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.TableMetadata; +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.util.Modules; +import io.vertx.core.Vertx; +import io.vertx.ext.web.client.WebClient; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import org.apache.cassandra.sidecar.TestModule; +import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.request.data.RepairPayload; +import org.apache.cassandra.sidecar.common.response.OperationalJobResponse; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.modules.SidecarModules; +import org.apache.cassandra.sidecar.server.Server; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.mockito.AdditionalAnswers; +import org.mockito.ArgumentCaptor; + +import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED; +import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.CREATED; +import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.FAILED; +import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link RepairHandler} + */ +@ExtendWith(VertxExtension.class) +public class RepairHandlerTest +{ + static final Logger LOGGER = LoggerFactory.getLogger(RepairHandlerTest.class); + private static final String REPAIR_ROUTE = "/api/v1/cassandra/keyspaces/testkeyspace/repair"; + Vertx vertx; + Server server; + StorageOperations mockStorageOperations = mock(StorageOperations.class); + InstanceMetadataFetcher mockMetadataFetcher = mock(InstanceMetadataFetcher.class); + + @BeforeEach + void before() throws InterruptedException + { + // Set up the mock metadata chain + InstanceMetadata mockInstanceMetadata = mock(InstanceMetadata.class); + CassandraAdapterDelegate mockDelegate = mock(CassandraAdapterDelegate.class); + Metadata mockMetadata = mock(Metadata.class); + KeyspaceMetadata mockKeyspaceMetadata = mock(KeyspaceMetadata.class); + TableMetadata mockTableMetadata = mock(TableMetadata.class); + + // Configure the mock chain + when(mockMetadataFetcher.instance(anyString())).thenReturn(mockInstanceMetadata); + when(mockMetadataFetcher.delegate(anyString())).thenReturn(mockDelegate); // Add this line to fix the NPE + when(mockInstanceMetadata.delegate()).thenReturn(mockDelegate); + when(mockDelegate.metadata()).thenReturn(mockMetadata); + when(mockDelegate.storageOperations()).thenReturn(mockStorageOperations); + when(mockMetadata.getKeyspace(anyString())).thenReturn(mockKeyspaceMetadata); + when(mockKeyspaceMetadata.getTable(anyString())).thenReturn(mockTableMetadata); + + AbstractModule repairTestModule = new AbstractModule() + { + @Override + protected void configure() + { + // Bind the mocks needed for the test + bind(StorageOperations.class).toInstance(mockStorageOperations); + bind(InstanceMetadataFetcher.class).toInstance(mockMetadataFetcher); + } + }; + + // Create the injector with the proper module overrides + Injector injector = Guice.createInjector( + Modules.override(SidecarModules.all()) + .with(Modules.override(new TestModule()) + .with(new CommonTest.CommonTestModule(mockStorageOperations), repairTestModule)) + ); + vertx = injector.getInstance(Vertx.class); + server = injector.getInstance(Server.class); + VertxTestContext context = new VertxTestContext(); + server.start() + .onSuccess(s -> context.completeNow()) + .onFailure(context::failNow); + context.awaitCompletion(5, TimeUnit.SECONDS); + } + + @AfterEach + void after() throws InterruptedException + { + CountDownLatch closeLatch = new CountDownLatch(1); + server.close().onSuccess(res -> closeLatch.countDown()); + if (closeLatch.await(60, TimeUnit.SECONDS)) + LOGGER.info("Close event received before timeout."); + else + LOGGER.error("Close event timed out."); + } + + @Test + void testRepairHandler(VertxTestContext context) + { + WebClient client = WebClient.create(vertx); + RepairPayload payload = RepairPayload.builder() + .isPrimaryRange(true) + .repairType(RepairPayload.RepairType.INCREMENTAL) + .tables(List.of("test_table")) + .build(); + client.put(server.actualPort(), "127.0.0.1", REPAIR_ROUTE) + .putHeader("Content-Type", "application/json") + .sendJson(payload, context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(OK.code()); + LOGGER.info("Repair Response: {}", response.bodyAsString()); + + OperationalJobResponse repairResponse = response.bodyAsJson(OperationalJobResponse.class); + assertThat(repairResponse).isNotNull(); + assertThat(repairResponse.status()).isEqualTo(SUCCEEDED); + context.completeNow(); + })); + } + + @Test + void testRepairHandlerIR(VertxTestContext context) + { + WebClient client = WebClient.create(vertx); + ArgumentCaptor> jobCapture = ArgumentCaptor.forClass(Map.class); + RepairPayload payload = RepairPayload.builder() + .repairType(RepairPayload.RepairType.INCREMENTAL) + .tables(List.of("test_table")) + .build(); + + client.put(server.actualPort(), "127.0.0.1", REPAIR_ROUTE) + .putHeader("Content-Type", "application/json") + .sendJson(payload, context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(OK.code()); + LOGGER.info("Repair Response: {}", response.bodyAsString()); + + OperationalJobResponse repairResponse = response.bodyAsJson(OperationalJobResponse.class); + assertThat(repairResponse).isNotNull(); + assertThat(repairResponse.status()).isEqualTo(SUCCEEDED); + verify(mockStorageOperations).repair(anyString(), jobCapture.capture()); + assertThat(jobCapture.getValue()).containsKey("incremental"); + assertThat(jobCapture.getValue().get("incremental")).isEqualTo("true"); + context.completeNow(); + })); + } + + @Test + void testRepairHandlerWithRanges(VertxTestContext context) + { + WebClient client = WebClient.create(vertx); + String expectedRanges = 0L + ":" + Integer.MAX_VALUE; + ArgumentCaptor> jobCapture = ArgumentCaptor.forClass(Map.class); + RepairPayload payload = RepairPayload.builder() + .startToken("0") + .endToken(Integer.toString(Integer.MAX_VALUE)) + .tables(List.of("test_table")) + .build(); + + client.put(server.actualPort(), "127.0.0.1", REPAIR_ROUTE) + .putHeader("Content-Type", "application/json") + .sendJson(payload, context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(OK.code()); + LOGGER.info("Repair Response: {}", response.bodyAsString()); + + OperationalJobResponse repairResponse = response.bodyAsJson(OperationalJobResponse.class); + assertThat(repairResponse).isNotNull(); + assertThat(repairResponse.status()).isEqualTo(SUCCEEDED); + verify(mockStorageOperations).repair(anyString(), jobCapture.capture()); + assertThat(jobCapture.getValue()).containsKey("ranges"); + assertThat(jobCapture.getValue().get("ranges")).isEqualTo(expectedRanges); + context.completeNow(); + })); + } + + @Test + void testRepairHandlerWithHosts(VertxTestContext context) + { + WebClient client = WebClient.create(vertx); + ArgumentCaptor> jobCapture = ArgumentCaptor.forClass(Map.class); + RepairPayload payload = RepairPayload.builder() + .hosts(List.of("127.0.0.1")) + .tables(List.of("test_table")) + .build(); + + client.put(server.actualPort(), "127.0.0.1", REPAIR_ROUTE) + .putHeader("Content-Type", "application/json") + .sendJson(payload, context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(OK.code()); + LOGGER.info("Repair Response: {}", response.bodyAsString()); + + OperationalJobResponse repairResponse = response.bodyAsJson(OperationalJobResponse.class); + assertThat(repairResponse).isNotNull(); + assertThat(repairResponse.status()).isEqualTo(SUCCEEDED); + verify(mockStorageOperations).repair(anyString(), jobCapture.capture()); + assertThat(jobCapture.getValue()).containsKey("hosts"); + assertThat(jobCapture.getValue().get("hosts")).isEqualTo("127.0.0.1"); + context.completeNow(); + })); + } + + @Test + void testRepairHandlerLongRunning(VertxTestContext context) + { + // Simulate repair invocation taking 6s + doAnswer(AdditionalAnswers.answersWithDelay(6000, invocation -> null)) + .when(mockStorageOperations).repair(anyString(), any()); + + WebClient client = WebClient.create(vertx); + RepairPayload payload = RepairPayload.builder() + .isPrimaryRange(true) + .tables(List.of("test_table")) + .build(); + + // Since it was the OperationalJobExecutionTimeout that triggered the response, the job is still + // in CREATED state, as we returned before polling for the job status. + client.put(server.actualPort(), "127.0.0.1", REPAIR_ROUTE) + .putHeader("Content-Type", "application/json") + .sendJson(payload, context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(ACCEPTED.code()); + LOGGER.info("Repair Response: {}", response.bodyAsString()); + + OperationalJobResponse repairResponse = response.bodyAsJson(OperationalJobResponse.class); + assertThat(repairResponse).isNotNull(); + assertThat(repairResponse.status()).isEqualTo(CREATED); + context.completeNow(); + })); + } + + @Test + void testRepairHandlerBadRequest(VertxTestContext context) + { + WebClient client = WebClient.create(vertx); + client.put(server.actualPort(), "127.0.0.1", REPAIR_ROUTE) + .putHeader("Content-Type", "application/json") + .send(context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(BAD_REQUEST.code()); + context.completeNow(); + })); + } + + @Test + void testRepairHandlerFailed(VertxTestContext context) + { + doThrow(new RuntimeException("Simulated failure")).when(mockStorageOperations).repair(anyString(), any()); + WebClient client = WebClient.create(vertx); + String testRoute = "/api/v1/cassandra/keyspaces/testkeyspace/repair"; + RepairPayload payload = RepairPayload.builder() + .isPrimaryRange(true) + .tables(List.of("test_table")) + .build(); + + + client.put(server.actualPort(), "127.0.0.1", testRoute) + .putHeader("Content-Type", "application/json") + .sendJson(payload, context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(OK.code()); + OperationalJobResponse repairResponse = response.bodyAsJson(OperationalJobResponse.class); + assertThat(repairResponse).isNotNull(); + assertThat(repairResponse.jobId()).isNotNull(); + assertThat(repairResponse.status()).isEqualTo(FAILED); + context.completeNow(); + })); + } + + @Test + void testRepairHandlerInvalidTokenRange_StartGreaterThanEnd(VertxTestContext context) + { + WebClient client = WebClient.create(vertx); + RepairPayload payload = RepairPayload.builder() + .startToken("100") + .endToken("50") // End token less than start token + .tables(List.of("test_table")) + .build(); + + client.put(server.actualPort(), "127.0.0.1", REPAIR_ROUTE) + .putHeader("Content-Type", "application/json") + .sendJson(payload, context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(BAD_REQUEST.code()); + assertThat(response.bodyAsString()).contains("Start token must be less than end token"); + context.completeNow(); + })); + } + + @Test + void testRepairHandlerInvalidTokenRange_StartEqualToEnd(VertxTestContext context) + { + WebClient client = WebClient.create(vertx); + RepairPayload payload = RepairPayload.builder() + .startToken("100") + .endToken("100") // End token equal to start token + .tables(List.of("test_table")) + .build(); + + client.put(server.actualPort(), "127.0.0.1", REPAIR_ROUTE) + .putHeader("Content-Type", "application/json") + .sendJson(payload, context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(BAD_REQUEST.code()); + assertThat(response.bodyAsString()).contains("Start token must be less than end token"); + context.completeNow(); + })); + } + + @Test + void testRepairHandlerInvalidTokenFormat_NonNumericStartToken(VertxTestContext context) + { + WebClient client = WebClient.create(vertx); + RepairPayload payload = RepairPayload.builder() + .startToken("invalid_token") + .endToken("100") + .tables(List.of("test_table")) + .build(); + + client.put(server.actualPort(), "127.0.0.1", REPAIR_ROUTE) + .putHeader("Content-Type", "application/json") + .sendJson(payload, context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(BAD_REQUEST.code()); + assertThat(response.bodyAsString()).contains("Invalid token format"); + context.completeNow(); + })); + } + + @Test + void testRepairHandlerInvalidTokenFormat_NonNumericEndToken(VertxTestContext context) + { + WebClient client = WebClient.create(vertx); + RepairPayload payload = RepairPayload.builder() + .startToken("50") + .endToken("invalid_token") + .tables(List.of("test_table")) + .build(); + + client.put(server.actualPort(), "127.0.0.1", REPAIR_ROUTE) + .putHeader("Content-Type", "application/json") + .sendJson(payload, context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(BAD_REQUEST.code()); + assertThat(response.bodyAsString()).contains("Invalid token format"); + context.completeNow(); + })); + } + + @Test + void testRepairHandlerValidLargeTokenRange(VertxTestContext context) + { + WebClient client = WebClient.create(vertx); + ArgumentCaptor> jobCapture = ArgumentCaptor.forClass(Map.class); + // Test with very large token values (BigInteger range) + String startToken = "123456789012345678901234567890"; + String endToken = "987654321098765432109876543210"; + RepairPayload payload = RepairPayload.builder() + .startToken(startToken) + .endToken(endToken) + .tables(List.of("test_table")) + .build(); + + client.put(server.actualPort(), "127.0.0.1", REPAIR_ROUTE) + .putHeader("Content-Type", "application/json") + .sendJson(payload, context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(OK.code()); + OperationalJobResponse repairResponse = response.bodyAsJson(OperationalJobResponse.class); + assertThat(repairResponse).isNotNull(); + assertThat(repairResponse.status()).isEqualTo(SUCCEEDED); + verify(mockStorageOperations).repair(anyString(), jobCapture.capture()); + assertThat(jobCapture.getValue()).containsKey("ranges"); + assertThat(jobCapture.getValue().get("ranges")).isEqualTo(startToken + ":" + endToken); + context.completeNow(); + })); + } + + @Test + void testRepairHandlerValidNegativeTokenRange(VertxTestContext context) + { + WebClient client = WebClient.create(vertx); + ArgumentCaptor> jobCapture = ArgumentCaptor.forClass(Map.class); + // Test with negative token values + String startToken = "-1000"; + String endToken = "1000"; + RepairPayload payload = RepairPayload.builder() + .startToken(startToken) + .endToken(endToken) + .tables(List.of("test_table")) + .build(); + + client.put(server.actualPort(), "127.0.0.1", REPAIR_ROUTE) + .putHeader("Content-Type", "application/json") + .sendJson(payload, context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(OK.code()); + OperationalJobResponse repairResponse = response.bodyAsJson(OperationalJobResponse.class); + assertThat(repairResponse).isNotNull(); + assertThat(repairResponse.status()).isEqualTo(SUCCEEDED); + verify(mockStorageOperations).repair(anyString(), jobCapture.capture()); + assertThat(jobCapture.getValue()).containsKey("ranges"); + assertThat(jobCapture.getValue().get("ranges")).isEqualTo(startToken + ":" + endToken); + context.completeNow(); + })); + } + + @Test + void testRepairHandlerOnlyStartTokenProvided(VertxTestContext context) + { + WebClient client = WebClient.create(vertx); + ArgumentCaptor> jobCapture = ArgumentCaptor.forClass(Map.class); + // Test with only start token (should not add ranges option) + RepairPayload payload = RepairPayload.builder() + .startToken("100") + // No end token + .tables(List.of("test_table")) + .build(); + + client.put(server.actualPort(), "127.0.0.1", REPAIR_ROUTE) + .putHeader("Content-Type", "application/json") + .sendJson(payload, context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(OK.code()); + OperationalJobResponse repairResponse = response.bodyAsJson(OperationalJobResponse.class); + assertThat(repairResponse).isNotNull(); + assertThat(repairResponse.status()).isEqualTo(SUCCEEDED); + verify(mockStorageOperations).repair(anyString(), jobCapture.capture()); + // Should not contain ranges since only start token was provided + assertThat(jobCapture.getValue()).doesNotContainKey("ranges"); + context.completeNow(); + })); + } +} diff --git a/server/src/test/java/org/apache/cassandra/sidecar/job/NodeDrainJobTest.java b/server/src/test/java/org/apache/cassandra/sidecar/job/NodeDrainJobTest.java index 72df6fb13..d60b987b3 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/job/NodeDrainJobTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/job/NodeDrainJobTest.java @@ -19,6 +19,7 @@ package org.apache.cassandra.sidecar.job; import java.io.IOException; +import java.util.Collections; import java.util.UUID; import java.util.concurrent.ExecutionException; @@ -73,40 +74,34 @@ void testIsRunningOnCassandra_WhenDraining() { when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_DRAINING); - assertThat(nodeDrainJob.isRunningOnCassandra()).isTrue(); + assertThat(nodeDrainJob.hasConflict(Collections.emptyList())).isTrue(); } @Test void testIsRunningOnCassandra_WhenDrained() { when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_DRAINED); - - assertThat(nodeDrainJob.isRunningOnCassandra()).isFalse(); + assertThat(nodeDrainJob.hasConflict(Collections.emptyList())).isFalse(); } @Test void testIsRunningOnCassandra_WhenNormal() { when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL); - - assertThat(nodeDrainJob.isRunningOnCassandra()).isFalse(); - } + assertThat(nodeDrainJob.hasConflict(Collections.emptyList())).isFalse(); } @Test void testIsRunningOnCassandra_WhenUnknownState() { when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_UNKNOWN); - - assertThat(nodeDrainJob.isRunningOnCassandra()).isFalse(); + assertThat(nodeDrainJob.hasConflict(Collections.emptyList())).isFalse(); } @Test void testIsRunningOnCassandra_WhenNull() { when(mockStorageOperations.operationMode()).thenReturn(null); - - assertThat(nodeDrainJob.isRunningOnCassandra()).isFalse(); - } + assertThat(nodeDrainJob.hasConflict(Collections.emptyList())).isFalse(); } @Test void testStatus_WhenDraining() diff --git a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java index d35745643..1b062f8a6 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java @@ -18,6 +18,7 @@ package org.apache.cassandra.sidecar.job; +import java.util.List; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -28,6 +29,7 @@ import org.junit.jupiter.api.Test; import com.datastax.driver.core.utils.UUIDs; +import io.vertx.core.Future; import io.vertx.core.Vertx; import org.apache.cassandra.sidecar.TestResourceReaper; import org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException; @@ -123,7 +125,7 @@ void testWithLongRunningJob() throws InterruptedException // Job should be running initially assertThat(testJob.asyncResult().isComplete()).isFalse(); assertThat(tracker.get(jobId)).isNotNull(); - + // Wait for completion assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(testJob.asyncResult().isComplete()).isTrue(); @@ -141,13 +143,13 @@ void testWithFailingJob() throws InterruptedException OperationalJob failingJob = new OperationalJob(jobId) { @Override - public boolean isRunningOnCassandra() + public boolean hasConflict(List jobs) { return false; } @Override - protected void executeInternal() throws OperationalJobException + protected Future executeInternal() throws OperationalJobException { throw new OperationalJobException(msg); } diff --git a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTest.java b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTest.java index 9dab537b7..fedd3c391 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTest.java @@ -18,6 +18,7 @@ package org.apache.cassandra.sidecar.job; +import java.util.List; import java.util.UUID; import com.google.common.util.concurrent.Uninterruptibles; @@ -60,12 +61,13 @@ public static OperationalJob createOperationalJob(UUID jobId, OperationalJobStat return new OperationalJob(jobId) { @Override - protected void executeInternal() throws OperationalJobException + protected Future executeInternal() throws OperationalJobException { + return Future.succeededFuture(); } @Override - public boolean isRunningOnCassandra() + public boolean hasConflict(List jobs) { return jobStatus == OperationalJobStatus.RUNNING; } @@ -94,13 +96,13 @@ public static OperationalJob createOperationalJob(UUID jobId, DurationSpec jobDu return new OperationalJob(jobId) { @Override - public boolean isRunningOnCassandra() + public boolean hasConflict(List jobs) { return false; } @Override - protected void executeInternal() throws OperationalJobException + protected Future executeInternal() throws OperationalJobException { if (jobDuration != null) { @@ -111,6 +113,7 @@ protected void executeInternal() throws OperationalJobException { throw jobFailure; } + return Future.succeededFuture(); } @Override @@ -146,13 +149,13 @@ void testJobFailed() OperationalJob failingJob = new OperationalJob(UUIDs.timeBased()) { @Override - public boolean isRunningOnCassandra() + public boolean hasConflict(List jobs) { return false; } @Override - protected void executeInternal() throws OperationalJobException + protected Future executeInternal() throws OperationalJobException { throw new OperationalJobException(msg); } diff --git a/server/src/test/java/org/apache/cassandra/sidecar/job/RepairJobTest.java b/server/src/test/java/org/apache/cassandra/sidecar/job/RepairJobTest.java new file mode 100644 index 000000000..b9a232cb4 --- /dev/null +++ b/server/src/test/java/org/apache/cassandra/sidecar/job/RepairJobTest.java @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.job; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; + +import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.datastax.driver.core.utils.UUIDs; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import org.apache.cassandra.sidecar.TestResourceReaper; +import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; +import org.apache.cassandra.sidecar.common.request.data.RepairPayload; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.common.server.data.Name; +import org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool; +import org.apache.cassandra.sidecar.config.RepairJobsConfiguration; +import org.apache.cassandra.sidecar.config.yaml.RepairJobsConfigurationImpl; +import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl; +import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException; +import org.apache.cassandra.sidecar.handlers.data.RepairRequestParam; +import org.mockito.Mockito; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests to validate job to run repairs + */ +public class RepairJobTest +{ + private static final int MAX_ATTEMPTS = 5; + protected Vertx vertx; + protected ExecutorPools executorPool; + + @BeforeEach + void setup() + { + vertx = Vertx.vertx(); + executorPool = new ExecutorPools(vertx, new ServiceConfigurationImpl()); + } + + @AfterEach + void cleanup() + { + TestResourceReaper.create().with(vertx).with(executorPool).close(); + } + + @Test + void testRepairJob() throws Exception + { + StorageOperations storageOperations = mock(StorageOperations.class); + when(storageOperations.getParentRepairStatus(anyInt())).thenReturn(Arrays.asList(RepairJob.ParentRepairStatus.COMPLETED.name())); + when(storageOperations.repair(any(), any())).thenReturn(1); + + RepairPayload payload = RepairPayload.builder() + .isPrimaryRange(true) + .tables(List.of("testtable")) + .build(); + RepairRequestParam repairParams = RepairRequestParam.from(new Name("testkeyspace"), payload); + + RepairJobsConfiguration config = new RepairJobsConfigurationImpl(MAX_ATTEMPTS, 100); // Use a shorter poll interval for testing + TaskExecutorPool internalPool = executorPool.internal(); + RepairJob testJob = new RepairJob(internalPool, config, UUIDs.timeBased(), storageOperations, repairParams); + + Promise promise = Promise.promise(); + testJob.execute(promise); + + // Wait for the job to complete (with timeout) + if (!promise.future().isComplete()) + { + promise.future().toCompletionStage().toCompletableFuture().get(1, TimeUnit.SECONDS); + } + + assertThat(testJob.asyncResult().isComplete()).isTrue(); + assertThat(testJob.status()).isEqualTo(OperationalJobStatus.SUCCEEDED); + } + + @Test + void testRunningRepairJob() + { + StorageOperations storageOperations = mock(StorageOperations.class); + when(storageOperations.getParentRepairStatus(anyInt())) + .thenReturn(Collections.singletonList(RepairJob.ParentRepairStatus.IN_PROGRESS.name())); + when(storageOperations.repair(any(), any())).thenReturn(1); + RepairPayload payload = RepairPayload.builder() + .isPrimaryRange(true) + .tables(List.of("testtable")) + .build(); + RepairRequestParam repairParams = RepairRequestParam.from(new Name("testkeyspace"), payload); + + RepairJobsConfiguration config = new RepairJobsConfigurationImpl(MAX_ATTEMPTS, 100); // Shorter poll interval for testing + TaskExecutorPool spyTaskExecutorPool = Mockito.spy(executorPool.internal()); + RepairJob testJob = new RepairJob(spyTaskExecutorPool, config, UUIDs.timeBased(), storageOperations, repairParams); + + Promise promise = Promise.promise(); + testJob.execute(promise); + + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + + assertThat(testJob.asyncResult().isComplete()).isTrue(); + assertThat(testJob.status()).isEqualTo(OperationalJobStatus.RUNNING); + Mockito.verify(storageOperations, atLeast(2)).getParentRepairStatus(anyInt()); + + // Verify timer was created and canceled + Mockito.verify(spyTaskExecutorPool, Mockito.atLeast(1)).setPeriodic(Mockito.eq(0L), Mockito.anyLong(), Mockito.any()); + Mockito.verify(spyTaskExecutorPool, Mockito.atLeast(1)).cancelTimer(Mockito.anyLong()); + } + + @Test + void testLongRunningRepairJobTimeout() throws Exception + { + StorageOperations storageOperations = mock(StorageOperations.class); + // Return null for MAX_ATTEMPTS times to simulate timeout + for (int i = 0; i < MAX_ATTEMPTS; i++) + { + when(storageOperations.getParentRepairStatus(anyInt())) + .thenReturn(null); + } + when(storageOperations.repair(any(), any())).thenReturn(1); + + RepairPayload payload = RepairPayload.builder() + .isPrimaryRange(true) + .tables(List.of("testtable")) + .build(); + RepairRequestParam repairParams = RepairRequestParam.from(new Name("testkeyspace"), payload); + + RepairJobsConfiguration config = new RepairJobsConfigurationImpl(MAX_ATTEMPTS, 100); // Shorter poll interval for testing + TaskExecutorPool spyTaskExecutorPool = Mockito.spy(executorPool.internal()); + RepairJob testJob = new RepairJob(spyTaskExecutorPool, config, UUIDs.timeBased(), storageOperations, repairParams); + + Promise promise = Promise.promise(); + testJob.execute(promise); + + promise.future().toCompletionStage().toCompletableFuture().get(2, TimeUnit.SECONDS); + assertThat(testJob.asyncResult().isComplete()).isTrue(); + assertThat(testJob.status()).isEqualTo(OperationalJobStatus.RUNNING); + + // Verify timer was created and canceled + Mockito.verify(spyTaskExecutorPool, Mockito.atLeast(1)).setPeriodic(Mockito.anyLong(), Mockito.anyLong(), Mockito.any()); + Mockito.verify(spyTaskExecutorPool, Mockito.atLeast(1)).cancelTimer(Mockito.anyLong()); + } + + @Test + void testTimersOnCompletion() throws Exception + { + TaskExecutorPool spyTaskExecutorPool = Mockito.spy(executorPool.internal()); + + StorageOperations storageOperations = mock(StorageOperations.class); + when(storageOperations.getParentRepairStatus(anyInt())) + .thenReturn(Arrays.asList(RepairJob.ParentRepairStatus.COMPLETED.name(), "Repair completed successfully")); + when(storageOperations.repair(any(), any())).thenReturn(1); + + RepairJobsConfiguration config = new RepairJobsConfigurationImpl(MAX_ATTEMPTS, 100); // Short poll interval for quick test + RepairPayload payload = RepairPayload.builder() + .isPrimaryRange(true) + .tables(List.of("testtable")) + .build(); + RepairRequestParam repairParams = RepairRequestParam.from(new Name("testkeyspace"), payload); + + RepairJob testJob = new RepairJob(spyTaskExecutorPool, config, UUIDs.timeBased(), storageOperations, repairParams); + + Promise promise = Promise.promise(); + testJob.execute(promise); + + if (!promise.future().isComplete()) + { + promise.future().toCompletionStage().toCompletableFuture().get(5, java.util.concurrent.TimeUnit.SECONDS); + } + + assertThat(testJob.status()).isEqualTo(OperationalJobStatus.SUCCEEDED); + Mockito.verify(spyTaskExecutorPool, Mockito.atLeast(1)).setPeriodic(Mockito.anyLong(), Mockito.anyLong(), Mockito.any()); + Mockito.verify(spyTaskExecutorPool, Mockito.atLeast(1)).cancelTimer(Mockito.anyLong()); + } + + @Test + void testTimersOnFailure() throws Exception + { + TaskExecutorPool spyTaskExecutorPool = Mockito.spy(executorPool.internal()); + + StorageOperations storageOperations = mock(StorageOperations.class); + when(storageOperations.getParentRepairStatus(anyInt())) + .thenReturn(Arrays.asList(RepairJob.ParentRepairStatus.FAILED.name(), "Repair failed with error")); + when(storageOperations.repair(any(), any())).thenReturn(1); + + RepairJobsConfiguration config = new RepairJobsConfigurationImpl(MAX_ATTEMPTS, 100); // Short poll interval for quick test + RepairPayload payload = RepairPayload.builder() + .isPrimaryRange(true) + .tables(List.of("testtable")) + .build(); + RepairRequestParam repairParams = RepairRequestParam.from(new Name("testkeyspace"), payload); + RepairJob testJob = new RepairJob(spyTaskExecutorPool, config, UUIDs.timeBased(), storageOperations, repairParams); + + Promise promise = Promise.promise(); + testJob.execute(promise); + + try + { + promise.future().toCompletionStage().toCompletableFuture().get(5, java.util.concurrent.TimeUnit.SECONDS); + } + catch (java.util.concurrent.ExecutionException e) + { + // Expected since the job should fail + } + + assertThat(testJob.status()).isEqualTo(OperationalJobStatus.FAILED); + Mockito.verify(spyTaskExecutorPool, Mockito.atLeast(1)).setPeriodic(Mockito.anyLong(), Mockito.anyLong(), Mockito.any()); + } + + @Test + void testMultipleRepairJobsRunningInParallel() throws Exception + { + // Create a job tracker and manager + OperationalJobTracker tracker = new OperationalJobTracker(10); + OperationalJobManager manager = new OperationalJobManager(tracker, executorPool); + + // Mock the storage operations + StorageOperations storageOperations = mock(StorageOperations.class); + when(storageOperations.getParentRepairStatus(anyInt())).thenReturn(Arrays.asList(RepairJob.ParentRepairStatus.COMPLETED.name())); + when(storageOperations.repair(any(), any())).thenReturn(1); + + // Create configuration for repair jobs with shorter poll interval for testing + RepairJobsConfiguration config = new RepairJobsConfigurationImpl(MAX_ATTEMPTS, 100); + + // Create multiple repair jobs with different parameters + RepairJob job1 = createRepairJob(config, storageOperations, "keyspace1", "table1"); + RepairJob job2 = createRepairJob(config, storageOperations, "keyspace1", "table2"); + RepairJob job3 = createRepairJob(config, storageOperations, "keyspace2", "table1"); + +// UUID jobId = UUIDs.timeBased(); + CountDownLatch latch = new CountDownLatch(1); +// OperationalJob testJob = OperationalJobTest.createOperationalJob(jobId, SecondBoundConfiguration.parse("2s")); + BiConsumer onComplete = (job, exception) -> { + assertThat(exception).isNull(); + latch.countDown(); + }; + + // Submit all jobs to the manager + Promise promise1 = Promise.promise(); + Promise promise2 = Promise.promise(); + Promise promise3 = Promise.promise(); + + job1.execute(promise1); + job2.execute(promise2); + job3.execute(promise3); + + manager.trySubmitJob(job1, onComplete, executorPool.service(), SecondBoundConfiguration.parse("5s")); + manager.trySubmitJob(job2, onComplete, executorPool.service(), SecondBoundConfiguration.parse("5s")); + manager.trySubmitJob(job3, onComplete, executorPool.service(), SecondBoundConfiguration.parse("5s")); + + // Wait for all jobs to complete (with timeout) + if (!promise1.future().isComplete()) + { + promise1.future().toCompletionStage().toCompletableFuture().get(2, TimeUnit.SECONDS); + } + if (!promise2.future().isComplete()) + { + promise2.future().toCompletionStage().toCompletableFuture().get(2, TimeUnit.SECONDS); + } + if (!promise3.future().isComplete()) + { + promise3.future().toCompletionStage().toCompletableFuture().get(2, TimeUnit.SECONDS); + } + + // Verify all jobs completed successfully + assertThat(job1.asyncResult().isComplete()).isTrue(); + assertThat(job1.status()).isEqualTo(OperationalJobStatus.SUCCEEDED); + assertThat(tracker.get(job1.jobId())).isNotNull(); + + assertThat(job2.asyncResult().isComplete()).isTrue(); + assertThat(job2.status()).isEqualTo(OperationalJobStatus.SUCCEEDED); + assertThat(tracker.get(job2.jobId())).isNotNull(); + + assertThat(job3.asyncResult().isComplete()).isTrue(); + assertThat(job3.status()).isEqualTo(OperationalJobStatus.SUCCEEDED); + assertThat(tracker.get(job3.jobId())).isNotNull(); + + // Verify that all jobs were tracked + assertThat(tracker.jobsView().size()).isEqualTo(3); + } + + @Test + void testEmptyRepairStatusHandling() throws Exception + { + StorageOperations storageOperations = mock(StorageOperations.class); + + // Return empty status (repair not started), then return IN_PROGRESS, then COMPLETED + when(storageOperations.getParentRepairStatus(anyInt())) + .thenReturn(Collections.emptyList()) + .thenReturn(Collections.emptyList()) + .thenReturn(Arrays.asList(RepairJob.ParentRepairStatus.COMPLETED.name())); + + when(storageOperations.repair(any(), any())).thenReturn(1); + + RepairJobsConfiguration config = new RepairJobsConfigurationImpl(MAX_ATTEMPTS, 100); + RepairPayload payload = RepairPayload.builder() + .isPrimaryRange(true) + .tables(List.of("testtable")) + .build(); + RepairRequestParam repairParams = RepairRequestParam.from(new Name("testkeyspace"), payload); + + TaskExecutorPool spyTaskExecutorPool = Mockito.spy(executorPool.internal()); + RepairJob testJob = new RepairJob(spyTaskExecutorPool, config, UUIDs.timeBased(), storageOperations, repairParams); + Promise promise = Promise.promise(); + testJob.execute(promise); + + if (!promise.future().isComplete()) + { + promise.future().toCompletionStage().toCompletableFuture().get(5, java.util.concurrent.TimeUnit.SECONDS); + } + + assertThat(testJob.status()).isEqualTo(OperationalJobStatus.SUCCEEDED); + // Verify the repair status was queried at least 4 times + Mockito.verify(storageOperations, Mockito.atLeast(4)).getParentRepairStatus(anyInt()); + + // Verify timers were set and canceled appropriately + Mockito.verify(spyTaskExecutorPool, Mockito.atLeast(1)).setPeriodic(Mockito.anyLong(), Mockito.anyLong(), Mockito.any()); + Mockito.verify(spyTaskExecutorPool, Mockito.atLeast(1)).cancelTimer(Mockito.anyLong()); + } + + /** + * Helper method to create a repair job with specific parameters + */ + private RepairJob createRepairJob(RepairJobsConfiguration config, StorageOperations storageOperations, + String keyspace, String table) + { + RepairPayload payload = RepairPayload.builder() + .isPrimaryRange(true) + .tables(List.of(table)) + .build(); + RepairRequestParam repairParams = RepairRequestParam.from(new Name(keyspace), payload); + return new RepairJob(executorPool.internal(), config, UUIDs.timeBased(), storageOperations, repairParams); + } +}