diff --git a/CHANGES.txt b/CHANGES.txt index d2baf2aa1..c4f474506 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 0.3.0 ----- + * Sidecar API Endpoint for Nodetool Compaction Stop (CASSSIDECAR-360) * Implementation of CDCPublisher (CASSSIDECAR-243) * Sidecar endpoint for moving a node to a new token (CASSSIDECAR-344) * Returning JSON responses for live migration status endpoints in case of errors (CASSSIDECAR-395) diff --git a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java index 556c405a0..d8bc9eea0 100644 --- a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java +++ b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java @@ -81,8 +81,11 @@ public CassandraAdapter(DnsResolver dnsResolver, this.tableOperations = Objects.requireNonNull(createTableOperations(jmxClient), "tableOperations is required"); this.compactionManagerOperations = Objects.requireNonNull(createCompactionManagerOperations(jmxClient), "compactionManagerOperations is required"); this.metricsOperations = Objects.requireNonNull(createMetricsOperations(jmxClient, tableSchemaFetcher), "metricsOperations is required"); - this.compactionStatsOperations = Objects.requireNonNull(createCompactionStatsOperations(storageOperations, metricsOperations, - compactionManagerOperations), "compactionStatsOperations is required"); + this.compactionStatsOperations + = Objects.requireNonNull(createCompactionStatsOperations(storageOperations, + metricsOperations, + compactionManagerOperations), + "compactionStatsOperations is required"); } /** diff --git a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraCompactionManagerOperations.java b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraCompactionManagerOperations.java index a2de738a1..55421bcee 100644 --- a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraCompactionManagerOperations.java +++ b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraCompactionManagerOperations.java @@ -15,11 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.cassandra.sidecar.adapters.base; +import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; import org.apache.cassandra.sidecar.adapters.base.jmx.CompactionManagerJmxOperations; import org.apache.cassandra.sidecar.common.server.CompactionManagerOperations; @@ -32,6 +34,11 @@ */ public class CassandraCompactionManagerOperations implements CompactionManagerOperations { + private static final List SUPPORTED_COMPACTION_TYPES = + Arrays.stream(CompactionType.values()) + .map(CompactionType::name) + .collect(Collectors.toList()); + protected final JmxClient jmxClient; /** @@ -53,4 +60,54 @@ public List> getCompactions() return jmxClient.proxy(CompactionManagerJmxOperations.class, COMPACTION_MANAGER_OBJ_NAME) .getCompactions(); } + + /** + * {@inheritDoc} + */ + @Override + public void stopCompactionById(String compactionId) + { + // compactionId takes precedence over type if both are provided + if (compactionId != null && !compactionId.trim().isEmpty()) + { + CompactionManagerJmxOperations proxy = jmxClient.proxy(CompactionManagerJmxOperations.class, + COMPACTION_MANAGER_OBJ_NAME); + proxy.stopCompactionById(compactionId); + } + else + { + throw new IllegalArgumentException("compaction process with compaction ID " + + compactionId + " is null or empty"); + } + } + + @Override + public void stopCompaction(String compactionType) + { + if (compactionType != null && !compactionType.trim().isEmpty()) + { + if (supportedCompactionTypes().contains(compactionType)) + { + CompactionManagerJmxOperations proxy = jmxClient.proxy(CompactionManagerJmxOperations.class, + COMPACTION_MANAGER_OBJ_NAME); + String errMsg + = "compaction process with compaction type " + compactionType + " must not be null when compactionId is not provided"; + proxy.stopCompaction(Objects.requireNonNull(compactionType, errMsg)); + } + else + { + throw new IllegalArgumentException("compaction type " + compactionType + " is not supported"); + } + } + else + { + throw new IllegalArgumentException("compaction type " + compactionType + " is null or empty"); + } + } + + @Override + public List supportedCompactionTypes() + { + return SUPPORTED_COMPACTION_TYPES; + } } diff --git a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CompactionType.java b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CompactionType.java new file mode 100644 index 000000000..54db89c10 --- /dev/null +++ b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CompactionType.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.adapters.base; + +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.stream.Collectors; + +import com.fasterxml.jackson.annotation.JsonCreator; + +/** + * Supported compaction types based on Cassandra's OperationType enum + */ +public enum CompactionType +{ + CLEANUP, + SCRUB, + UPGRADE_SSTABLES, + VERIFY, + RELOCATE, + GARBAGE_COLLECT, + ANTICOMPACTION, + VALIDATION, + INDEX_BUILD, + VIEW_BUILD, + COMPACTION, + TOMBSTONE_COMPACTION, + KEY_CACHE_SAVE, + ROW_CACHE_SAVE, + COUNTER_CACHE_SAVE, + INDEX_SUMMARY; + + private static final List SUPPORTED_COMPACTION_TYPES = + Arrays.stream(org.apache.cassandra.sidecar.adapters.base.CompactionType.values()) + .map(org.apache.cassandra.sidecar.adapters.base.CompactionType::name) + .collect(Collectors.toList()); + + @Override + public String toString() + { + return name(); + } + + /** + * Case-insensitive factory method for Jackson deserialization + * @return {@link CompactionType} from string + */ + @JsonCreator + public static CompactionType fromString(String name) + { + if (name == null || name.trim().isEmpty()) + { + return null; + } + try + { + return valueOf(name.trim().toUpperCase(Locale.ROOT)); + } + catch (IllegalArgumentException unknownEnum) + { + throw new IllegalArgumentException( + String.format("Unsupported compactionType: '%s'. Valid types are: %s", + name, SUPPORTED_COMPACTION_TYPES)); + } + } +} diff --git a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/CompactionManagerJmxOperations.java b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/CompactionManagerJmxOperations.java index a054c91df..aad573cdd 100644 --- a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/CompactionManagerJmxOperations.java +++ b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/CompactionManagerJmxOperations.java @@ -34,4 +34,16 @@ public interface CompactionManagerJmxOperations * @return list of compaction info maps */ List> getCompactions(); + + /** + * Stop compaction by type + * @throws IllegalArgumentException when compaction type is null + */ + void stopCompaction(String type); + + /** + * Stop compaction by ID + * @throws IllegalArgumentException when compaction ID is null + */ + void stopCompactionById(String compactionId); } diff --git a/adapters/adapters-base/src/test/java/org/apache/cassandra/sidecar/adapters/base/CassandraCompactionManagerOperationsTest.java b/adapters/adapters-base/src/test/java/org/apache/cassandra/sidecar/adapters/base/CassandraCompactionManagerOperationsTest.java new file mode 100644 index 000000000..ce57869d8 --- /dev/null +++ b/adapters/adapters-base/src/test/java/org/apache/cassandra/sidecar/adapters/base/CassandraCompactionManagerOperationsTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.sidecar.adapters.base.jmx.CompactionManagerJmxOperations; +import org.apache.cassandra.sidecar.common.server.JmxClient; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.apache.cassandra.sidecar.adapters.base.jmx.CompactionManagerJmxOperations.COMPACTION_MANAGER_OBJ_NAME; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + + +/** + * Tests for {@link CassandraCompactionManagerOperations} class + */ +class CassandraCompactionManagerOperationsTest +{ + private CassandraCompactionManagerOperations compactionManagerOperations; + private JmxClient mockJmxClient; + private CompactionManagerJmxOperations mockJmxOperations; + + @BeforeEach + void setUp() + { + mockJmxClient = mock(JmxClient.class); + mockJmxOperations = mock(CompactionManagerJmxOperations.class); + compactionManagerOperations = new CassandraCompactionManagerOperations(mockJmxClient); + + // Setup JMX proxy mock + when(mockJmxClient.proxy(CompactionManagerJmxOperations.class, COMPACTION_MANAGER_OBJ_NAME)) + .thenReturn(mockJmxOperations); + } + + @Test + void testStopCompactionByIdOnly() + { + // Test stopCompactionById called when providing compactionId + String compactionId = "abc-123"; + compactionManagerOperations.stopCompactionById(compactionId); + + verify(mockJmxOperations, times(1)).stopCompactionById(compactionId); + verify(mockJmxOperations, times(0)).stopCompaction(org.mockito.ArgumentMatchers.anyString()); + } + + @Test + void testStopCompactionByTypeOnly() + { + // Test stopCompaction called when no compactionId provided + String compactionType = "COMPACTION"; + compactionManagerOperations.stopCompaction(compactionType); + + verify(mockJmxOperations, times(1)).stopCompaction(compactionType); + verify(mockJmxOperations, times(0)).stopCompactionById(org.mockito.ArgumentMatchers.anyString()); + } + + @Test + void testStopCompactionByIdWithWhitespace() + { + // Test trim does not result in empty string + String compactionId = " abc-123 "; + compactionManagerOperations.stopCompactionById(compactionId); + + verify(mockJmxOperations, times(1)).stopCompactionById(compactionId); + verify(mockJmxOperations, times(0)).stopCompaction(org.mockito.ArgumentMatchers.anyString()); + } + + @Test + void testStopCompactionAllSupportedTypes() + { + // Test no failures upon any supported type being provided as param + String[] supportedTypes = { + "COMPACTION", "VALIDATION", "KEY_CACHE_SAVE", "ROW_CACHE_SAVE", + "COUNTER_CACHE_SAVE", "CLEANUP", "SCRUB", "UPGRADE_SSTABLES", + "INDEX_BUILD", "TOMBSTONE_COMPACTION", "ANTICOMPACTION", + "VERIFY", "VIEW_BUILD", "INDEX_SUMMARY", "RELOCATE", + "GARBAGE_COLLECT" + }; + + for (String type : supportedTypes) + { + compactionManagerOperations.stopCompaction(type); + verify(mockJmxOperations, times(1)).stopCompaction(type); + } + } + + @Test + void testStopCompactionCatchesUnsupportedType() + { + String compactionType = "MAJOR_COMPACTION"; + assertThrows(IllegalArgumentException.class, + () -> compactionManagerOperations.stopCompaction(compactionType)); + } + + @Test + void testStopCompactionJmxProxyCalledOnce() + { + // Test JMX proxy obtained exactly once per call + compactionManagerOperations.stopCompactionById("test-id"); + + verify(mockJmxClient, times(1)) + .proxy(CompactionManagerJmxOperations.class, COMPACTION_MANAGER_OBJ_NAME); + } +} diff --git a/adapters/adapters-cassandra50/build.gradle b/adapters/adapters-cassandra50/build.gradle new file mode 100644 index 000000000..b52a18d6f --- /dev/null +++ b/adapters/adapters-cassandra50/build.gradle @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import java.nio.file.Paths + +plugins { + id 'java-library' + id 'idea' + id 'maven-publish' + id "com.github.spotbugs" +} + +apply from: "$rootDir/gradle/common/publishing.gradle" + +sourceCompatibility = JavaVersion.VERSION_11 + +test { + useJUnitPlatform() + maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) ?: 1 + reports { + junitXml.setRequired(true) + def destDir = Paths.get(rootProject.rootDir.absolutePath, "build", "test-results", "adapters-cassandra50").toFile() + println("Destination directory for adapters-cassandra50 tests: ${destDir}") + junitXml.getOutputLocation().set(destDir) + html.setRequired(true) + html.getOutputLocation().set(destDir) + } +} + +dependencies { + api(project(":server-common")) + api(project(":adapters:adapters-base")) + api(project(":adapters:adapters-cassandra41")) + + compileOnly('org.jetbrains:annotations:23.0.0') + compileOnly('com.datastax.cassandra:cassandra-driver-core:3.11.3') + implementation("org.slf4j:slf4j-api:${project.slf4jVersion}") +} + +spotbugsTest.enabled = false \ No newline at end of file diff --git a/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50Adapter.java b/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50Adapter.java new file mode 100644 index 000000000..f7ac798ba --- /dev/null +++ b/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50Adapter.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.cassandra50; + +import java.net.InetSocketAddress; + +import org.apache.cassandra.sidecar.adapters.base.CassandraAdapter; +import org.apache.cassandra.sidecar.common.server.CQLSessionProvider; +import org.apache.cassandra.sidecar.common.server.CompactionManagerOperations; +import org.apache.cassandra.sidecar.common.server.ICassandraAdapter; +import org.apache.cassandra.sidecar.common.server.JmxClient; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.common.server.dns.DnsResolver; +import org.apache.cassandra.sidecar.common.server.utils.DriverUtils; +import org.apache.cassandra.sidecar.db.schema.TableSchemaFetcher; +import org.jetbrains.annotations.NotNull; + +/** + * A {@link ICassandraAdapter} implementation for Cassandra 5.0 + */ +public class Cassandra50Adapter extends CassandraAdapter +{ + public Cassandra50Adapter(DnsResolver dnsResolver, + JmxClient jmxClient, + CQLSessionProvider session, + InetSocketAddress localNativeTransportAddress, + DriverUtils driverUtils, + TableSchemaFetcher tableSchemaFetcher) + { + super(dnsResolver, jmxClient, session, localNativeTransportAddress, driverUtils, tableSchemaFetcher); + } + + /** + * {@inheritDoc} + */ + @Override + @NotNull + protected StorageOperations createStorageOperations(DnsResolver dnsResolver, JmxClient jmxClient) + { + return new Cassandra50StorageOperations(jmxClient, dnsResolver); + } + + /** + * {@inheritDoc} + * + * Returns Cassandra 4.x-specific CompactionManagerOperations that excludes unsupported types + */ + @Override + @NotNull + protected CompactionManagerOperations createCompactionManagerOperations(JmxClient jmxClient) + { + return new Cassandra50CompactionManagerOperations(jmxClient); + } +} diff --git a/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50CompactionManagerOperations.java b/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50CompactionManagerOperations.java new file mode 100644 index 000000000..a6f6094f7 --- /dev/null +++ b/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50CompactionManagerOperations.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.cassandra50; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.cassandra.sidecar.adapters.base.CassandraCompactionManagerOperations; +import org.apache.cassandra.sidecar.common.server.JmxClient; + +/** + * Cassandra 5.0 implementation of CompactionManagerOperations that supports MAJOR_COMPACTION + * which is only available in Cassandra 5.x versions + */ +public class Cassandra50CompactionManagerOperations extends CassandraCompactionManagerOperations +{ + private static final List SUPPORTED_COMPACTION_TYPES = + Arrays.stream(org.apache.cassandra.sidecar.adapters.cassandra50.CompactionType.values()) + .map(org.apache.cassandra.sidecar.adapters.cassandra50.CompactionType::name) + .collect(Collectors.toList()); + /** + * Creates a new instance with the provided {@link JmxClient} + * + * @param jmxClient the JMX client used to communicate with the Cassandra instance + */ + public Cassandra50CompactionManagerOperations(JmxClient jmxClient) + { + super(jmxClient); + } + + /** + * {@inheritDoc} + * + * Cassandra 5.x supports MAJOR_COMPACTION operation type + */ + @Override + public List supportedCompactionTypes() + { + return SUPPORTED_COMPACTION_TYPES; + } +} diff --git a/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50Factory.java b/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50Factory.java new file mode 100644 index 000000000..131a2e028 --- /dev/null +++ b/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50Factory.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.cassandra50; + +import java.net.InetSocketAddress; +import java.util.Objects; + +import org.apache.cassandra.sidecar.common.server.CQLSessionProvider; +import org.apache.cassandra.sidecar.common.server.ICassandraAdapter; +import org.apache.cassandra.sidecar.common.server.ICassandraFactory; +import org.apache.cassandra.sidecar.common.server.JmxClient; +import org.apache.cassandra.sidecar.common.server.MinimumVersion; +import org.apache.cassandra.sidecar.common.server.dns.DnsResolver; +import org.apache.cassandra.sidecar.common.server.utils.DriverUtils; +import org.apache.cassandra.sidecar.db.schema.TableSchemaFetcher; + +/** + * Factory to produce the Cassandra 5.x adapter + */ +@MinimumVersion("5.0.0") +public class Cassandra50Factory implements ICassandraFactory +{ + private final DnsResolver dnsResolver; + private final DriverUtils driverUtils; + private final TableSchemaFetcher tableSchemaFetcher; + + public Cassandra50Factory(DnsResolver dnsResolver, DriverUtils driverUtils, TableSchemaFetcher tableSchemaFetcher) + { + this.dnsResolver = Objects.requireNonNull(dnsResolver, "dnsResolver is required"); + this.driverUtils = Objects.requireNonNull(driverUtils, "driverUtils is required"); + this.tableSchemaFetcher = Objects.requireNonNull(tableSchemaFetcher, "tableSchemaFetcher is required"); + } + + /** + * Returns a new adapter for Cassandra 5.x clusters. + * + * @param session the session to the Cassandra database + * @param jmxClient the JMX client to connect to the Cassandra database + * @param localNativeTransportAddress the native transport address and port of the instance + * @return a new adapter for the 5.x clusters + */ + @Override + public ICassandraAdapter create(CQLSessionProvider session, JmxClient jmxClient, + InetSocketAddress localNativeTransportAddress) + { + return new Cassandra50Adapter(dnsResolver, jmxClient, session, localNativeTransportAddress, driverUtils, tableSchemaFetcher); + } +} diff --git a/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50StorageOperations.java b/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50StorageOperations.java new file mode 100644 index 000000000..6e029d1f0 --- /dev/null +++ b/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50StorageOperations.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.cassandra50; + +import org.apache.cassandra.sidecar.adapters.base.RingProvider; +import org.apache.cassandra.sidecar.adapters.base.TokenRangeReplicaProvider; +import org.apache.cassandra.sidecar.adapters.cassandra41.Cassandra41StorageOperations; +import org.apache.cassandra.sidecar.common.server.JmxClient; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.common.server.dns.DnsResolver; + +/** + * An implementation of the {@link StorageOperations} that interfaces with Cassandra 5.0 and later + */ +public class Cassandra50StorageOperations extends Cassandra41StorageOperations +{ + /** + * Creates a new instance with the provided {@link JmxClient} and {@link DnsResolver} + * + * @param jmxClient the JMX client used to communicate with the Cassandra instance + * @param dnsResolver the DNS resolver used to lookup replicas + */ + public Cassandra50StorageOperations(JmxClient jmxClient, DnsResolver dnsResolver) + { + super(jmxClient, dnsResolver); + } + + /** + * Creates a new instances with the provided {@link JmxClient}, {@link RingProvider}, and + * {@link TokenRangeReplicaProvider}. This constructor is exposed for extensibility. + * + * @param jmxClient the JMX client used to communicate with the Cassandra instance + * @param ringProvider the ring provider instance + * @param tokenRangeReplicaProvider the token range replica provider + */ + public Cassandra50StorageOperations(JmxClient jmxClient, + RingProvider ringProvider, + TokenRangeReplicaProvider tokenRangeReplicaProvider) + { + super(jmxClient, ringProvider, tokenRangeReplicaProvider); + } +} diff --git a/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/CompactionType.java b/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/CompactionType.java new file mode 100644 index 000000000..202831fc6 --- /dev/null +++ b/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/CompactionType.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.adapters.cassandra50; + +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.stream.Collectors; + +import com.fasterxml.jackson.annotation.JsonCreator; + +/** + * Supported compaction types based on Cassandra's OperationType enum + */ +public enum CompactionType +{ + CLEANUP, + SCRUB, + UPGRADE_SSTABLES, + VERIFY, + RELOCATE, + GARBAGE_COLLECT, + ANTICOMPACTION, + VALIDATION, + INDEX_BUILD, + VIEW_BUILD, + COMPACTION, + TOMBSTONE_COMPACTION, + KEY_CACHE_SAVE, + ROW_CACHE_SAVE, + COUNTER_CACHE_SAVE, + INDEX_SUMMARY, + MAJOR_COMPACTION; + + private static final List SUPPORTED_COMPACTION_TYPES = + Arrays.stream(org.apache.cassandra.sidecar.adapters.base.CompactionType.values()) + .map(org.apache.cassandra.sidecar.adapters.base.CompactionType::name) + .collect(Collectors.toList()); + + @Override + public String toString() + { + return name(); + } + + /** + * Case-insensitive factory method for Jackson deserialization + * @return {@link CompactionType} from string + */ + @JsonCreator + public static CompactionType fromString(String name) + { + if (name == null || name.trim().isEmpty()) + { + return null; + } + try + { + return valueOf(name.trim().toUpperCase(Locale.ROOT)); + } + catch (IllegalArgumentException unknownEnum) + { + throw new IllegalArgumentException( + String.format("Unsupported compactionType: '%s'. Valid types are: %s", + name, SUPPORTED_COMPACTION_TYPES)); + } + } +} diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java index 8af3b9666..8269bf042 100644 --- a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java @@ -139,6 +139,7 @@ public final class ApiEndpointsV1 public static final String CONNECTED_CLIENT_STATS_ROUTE = API_V1 + CASSANDRA + "/stats/connected-clients"; public static final String COMPACTION_STATS_ROUTE = API_V1 + CASSANDRA + "/stats/compaction"; + private static final String OPERATION_ROUTE = "/operations"; private static final String OPERATIONAL_JOBS = "/operational-jobs"; private static final String PER_OPERATIONAL_JOB = OPERATIONAL_JOBS + '/' + OPERATIONAL_JOB_ID_PATH_PARAM; public static final String LIST_OPERATIONAL_JOBS_ROUTE = API_V1 + CASSANDRA + OPERATIONAL_JOBS; @@ -148,7 +149,7 @@ public final class ApiEndpointsV1 public static final String NODE_DRAIN_ROUTE = API_V1 + CASSANDRA + "/operations/drain"; public static final String STREAM_STATS_ROUTE = API_V1 + CASSANDRA + "/stats/streams"; public static final String TABLE_STATS_ROUTE = API_V1 + CASSANDRA + PER_KEYSPACE + PER_TABLE + "/stats"; - + public static final String COMPACTION_STOP_ROUTE = API_V1 + CASSANDRA + OPERATION_ROUTE + "/compaction/stop"; // Live Migration APIs public static final String LIVE_MIGRATION_API_PREFIX = API_V1 + "/live-migration"; diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/CompactionStopStatus.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/CompactionStopStatus.java new file mode 100644 index 000000000..a2c9e3aed --- /dev/null +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/CompactionStopStatus.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.common.data; + +/** + * Status values for compaction stop operations + */ +public enum CompactionStopStatus +{ + /** + * Compaction stop request submitted to Cassandra - ongoing compactions now stopping + */ + SUBMITTED, + + /** + * Compaction stop request failed + */ + FAILED; +} diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/CompactionStopRequest.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/CompactionStopRequest.java new file mode 100644 index 000000000..36b27224d --- /dev/null +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/CompactionStopRequest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.request; + +import io.netty.handler.codec.http.HttpMethod; +import org.apache.cassandra.sidecar.common.ApiEndpointsV1; +import org.apache.cassandra.sidecar.common.request.data.CompactionStopRequestPayload; +import org.apache.cassandra.sidecar.common.response.CompactionStopResponse; + +/** + * Represents a request to execute compaction stop operation + */ +public class CompactionStopRequest extends JsonRequest +{ + private final CompactionStopRequestPayload payload; + + /** + * Constructs a request to execute a compaction stop operation with a payload + * + * @param payload the payload containing compaction type or ID to stop + */ + public CompactionStopRequest(CompactionStopRequestPayload payload) + { + super(ApiEndpointsV1.COMPACTION_STOP_ROUTE); + this.payload = payload; + } + + /** + * {@inheritDoc} + */ + @Override + public HttpMethod method() + { + return HttpMethod.PUT; + } + + /** + * {@inheritDoc} + */ + @Override + public Object requestBody() + { + return payload; + } +} diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/CompactionStopRequestPayload.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/CompactionStopRequestPayload.java new file mode 100644 index 000000000..b9998a4e3 --- /dev/null +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/CompactionStopRequestPayload.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.request.data; + +import java.util.Locale; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Request payload for stopping compaction operations. + * + *

Valid JSON:

+ *
+ *   { "compactionType": "COMPACTION", "compactionId": "abc-123" }
+ *   { "compactionType": "VALIDATION" }
+ * 
+ */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class CompactionStopRequestPayload +{ + public static final String COMPACTION_TYPE_KEY = "compactionType"; + public static final String COMPACTION_ID_KEY = "compactionId"; + + private final String compactionType; + private final String compactionId; + + /** + * Creates a new CompactionStopRequestPayload + * + * @param compactionType the type of compaction to stop (e.g., COMPACTION, VALIDATION, etc.) + * @param compactionId optional ID of a specific compaction to stop + */ + @JsonCreator + public CompactionStopRequestPayload(@JsonProperty(value = COMPACTION_TYPE_KEY) String compactionType, + @JsonProperty(value = COMPACTION_ID_KEY) String compactionId) + { + // Normalize compactionType: trim whitespace and convert to uppercase + this.compactionType = normalizeCompactionType(compactionType); + this.compactionId = compactionId; + } + + /** + * Normalizes the compaction type by trimming whitespace and converting to uppercase. + * Returns null for null or empty strings. + * + * @param compactionType the raw compaction type string + * @return normalized compaction type or null + */ + private static String normalizeCompactionType(String compactionType) + { + if (compactionType == null || compactionType.trim().isEmpty()) + { + return null; + } + return compactionType.trim().toUpperCase(Locale.ROOT); + } + + /** + * @return the type of compaction to stop + */ + @JsonProperty(COMPACTION_TYPE_KEY) + public String compactionType() + { + return this.compactionType; + } + + /** + * @return the ID of a specific compaction to stop, or null to stop all specified type + */ + @JsonProperty(COMPACTION_ID_KEY) + public String compactionId() + { + return this.compactionId != null ? this.compactionId.trim() : null; + } + + /** + * Checks compaction ID valid - not null or empty post-trim + * */ + public boolean hasValidCompactionId() + { + return this.compactionId != null && !this.compactionId.trim().isEmpty(); + } + + /** + * Checks compaction type not null and not empty for invalid compactionId cases + * */ + public boolean hasValidCompactionType() + { + return this.compactionType != null && !this.compactionType.isEmpty(); + } + + /** + * Checks at least one valid parameter provided + * + * @return true if either compaction ID or compaction type is valid, false otherwise + */ + public boolean atLeastOneParamProvided() + { + return hasValidCompactionId() || hasValidCompactionType(); + } + + @Override + public String toString() + { + return "CompactionStopRequestPayload{" + + "compactionType='" + compactionType + "'" + + ", compactionId='" + compactionId + "'" + + "}"; + } +} diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/CompactionStopResponse.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/CompactionStopResponse.java new file mode 100644 index 000000000..ebdbac7b2 --- /dev/null +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/CompactionStopResponse.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.response; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.cassandra.sidecar.common.DataObjectBuilder; +import org.apache.cassandra.sidecar.common.data.CompactionStopStatus; + +/** + * Response class for the Compaction Stop API + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class CompactionStopResponse +{ + public static final String COMPACTION_TYPE_KEY = "compactionType"; + public static final String COMPACTION_ID_KEY = "compactionId"; + public static final String STATUS_KEY = "status"; + private final String compactionType; + private final String compactionId; + private final CompactionStopStatus status; + + private CompactionStopResponse(Builder builder) + { + this.compactionType = builder.compactionType; + this.compactionId = builder.compactionId; + this.status = builder.status; + } + + /** + * Constructs a new {@link CompactionStopResponse}. + * + * @param compactionType the type of compaction that was requested to stop + * @param compactionId the ID of the compaction that was requested to stop + * @param status the status of the stop operation (e.g., "PENDING", "FAILED") + */ + @JsonCreator + public CompactionStopResponse(@JsonProperty(COMPACTION_TYPE_KEY) String compactionType, + @JsonProperty(COMPACTION_ID_KEY) String compactionId, + @JsonProperty(STATUS_KEY) CompactionStopStatus status) + { + this.compactionType = compactionType; + this.compactionId = compactionId; + this.status = status; + } + + /** + * @return the type of compaction that was requested to stop + */ + @JsonProperty(COMPACTION_TYPE_KEY) + public String compactionType() + { + return compactionType; + } + + /** + * @return the ID of the compaction that was requested to stop + */ + @JsonProperty(COMPACTION_ID_KEY) + public String compactionId() + { + return compactionId; + } + + /** + * @return the status of the stop operation + */ + @JsonProperty(STATUS_KEY) + public CompactionStopStatus status() + { + return status; + } + + @Override + public String toString() + { + return String.format( + "CompactionStopResponse{compactionType='%s', compactionId='%s', status='%s'}", + compactionType, compactionId, status + ); + } + + public static Builder builder() + { + return new Builder(); + } + + /** + * {@code CompactionStopResponse} builder static inner class. + */ + public static final class Builder implements DataObjectBuilder + { + private String compactionType; + private String compactionId; + private CompactionStopStatus status; + + private Builder() + { + } + + @Override + public Builder self() + { + return this; + } + + /** + * Sets the {@code compactionType} and returns a reference to this Builder enabling method chaining. + * + * @param compactionType the {@code compactionType} to set + * @return a reference to this Builder + */ + public Builder compactionType(String compactionType) + { + return update(b -> b.compactionType = compactionType); + } + + /** + * Sets the {@code compactionId} and returns a reference to this Builder enabling method chaining. + * + * @param compactionId the {@code compactionId} to set + * @return a reference to this Builder + */ + public Builder compactionId(String compactionId) + { + return update(b -> b.compactionId = compactionId); + } + + /** + * Sets the {@code status} and returns a reference to this Builder enabling method chaining. + * + * @param status the {@code status} to set + * @return a reference to this Builder + */ + public Builder status(CompactionStopStatus status) + { + return update(b -> b.status = status); + } + + + /** + * Returns a {@code CompactionStopResponse} built from the parameters previously set. + * + * @return a {@code CompactionStopResponse} built with parameters of this {@code CompactionStopResponse.Builder} + */ + @Override + public CompactionStopResponse build() + { + return new CompactionStopResponse(this); + } + } +} diff --git a/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/data/CompactionStopRequestPayloadTest.java b/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/data/CompactionStopRequestPayloadTest.java new file mode 100644 index 000000000..80e86ea31 --- /dev/null +++ b/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/data/CompactionStopRequestPayloadTest.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.request.data; + +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@link CompactionStopRequestPayload} serialization and deserialization + */ +class CompactionStopRequestPayloadTest +{ + private static final ObjectMapper MAPPER + = new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL); + + /** + * All known compaction types across all Cassandra versions for testing purposes + */ + private static final String[] ALL_COMPACTION_TYPES = { + "CLEANUP", "SCRUB", "UPGRADE_SSTABLES", "VERIFY", "RELOCATE", + "GARBAGE_COLLECT", "ANTICOMPACTION", "VALIDATION", "INDEX_BUILD", + "VIEW_BUILD", "COMPACTION", "TOMBSTONE_COMPACTION", "KEY_CACHE_SAVE", + "ROW_CACHE_SAVE", "COUNTER_CACHE_SAVE", "INDEX_SUMMARY", "MAJOR_COMPACTION" + }; + + @Test + void testSerDeserWithBothFields() throws JsonProcessingException + { + CompactionStopRequestPayload payload = new CompactionStopRequestPayload("COMPACTION", "abc-123"); + String json = MAPPER.writeValueAsString(payload); + assertThat(json).isEqualTo("{\"compactionType\":\"COMPACTION\",\"compactionId\":\"abc-123\"}"); + + CompactionStopRequestPayload deser = MAPPER.readValue(json, CompactionStopRequestPayload.class); + assertThat(deser.compactionType()).isEqualTo(payload.compactionType()); + assertThat(deser.compactionId()).isEqualTo(payload.compactionId()); + } + + @Test + void testSerDeserWithTypeOnly() throws JsonProcessingException + { + CompactionStopRequestPayload payload = new CompactionStopRequestPayload("VALIDATION", null); + String json = MAPPER.writeValueAsString(payload); + assertThat(json).isEqualTo("{\"compactionType\":\"VALIDATION\"}"); + + CompactionStopRequestPayload deser = MAPPER.readValue(json, CompactionStopRequestPayload.class); + assertThat(deser.compactionType()).isEqualTo("VALIDATION"); + assertThat(deser.compactionId()).isNull(); + } + + @Test + void testSerDeserWithIdOnly() throws JsonProcessingException + { + CompactionStopRequestPayload payload = new CompactionStopRequestPayload(null, "xyz-456"); + String json = MAPPER.writeValueAsString(payload); + assertThat(json).isEqualTo("{\"compactionId\":\"xyz-456\"}"); + + CompactionStopRequestPayload deser = MAPPER.readValue(json, CompactionStopRequestPayload.class); + assertThat(deser.compactionType()).isNull(); + assertThat(deser.compactionId()).isEqualTo("xyz-456"); + } + + @Test + void testSerDeserWithBothNull() throws JsonProcessingException + { + CompactionStopRequestPayload payload = new CompactionStopRequestPayload(null, null); + String json = MAPPER.writeValueAsString(payload); + assertThat(json).isEqualTo("{}"); + + CompactionStopRequestPayload deser = MAPPER.readValue(json, CompactionStopRequestPayload.class); + assertThat(deser.compactionType()).isNull(); + assertThat(deser.compactionId()).isNull(); + } + + @Test + void testDeserFromJsonWithBothFields() throws JsonProcessingException + { + String json = "{\"compactionType\":\"CLEANUP\",\"compactionId\":\"test-123\"}"; + CompactionStopRequestPayload payload = MAPPER.readValue(json, CompactionStopRequestPayload.class); + assertThat(payload.compactionType()).isEqualTo("CLEANUP"); + assertThat(payload.compactionId()).isEqualTo("test-123"); + } + + @Test + void testDeserFromJsonWithTypeOnly() throws JsonProcessingException + { + String json = "{\"compactionType\":\"SCRUB\"}"; + CompactionStopRequestPayload payload = MAPPER.readValue(json, CompactionStopRequestPayload.class); + assertThat(payload.compactionType()).isEqualTo("SCRUB"); + assertThat(payload.compactionId()).isNull(); + } + + @Test + void testDeserFromJsonWithIdOnly() throws JsonProcessingException + { + String json = "{\"compactionId\":\"unique-compaction-id\"}"; + CompactionStopRequestPayload payload = MAPPER.readValue(json, CompactionStopRequestPayload.class); + assertThat(payload.compactionType()).isNull(); + assertThat(payload.compactionId()).isEqualTo("unique-compaction-id"); + } + + @Test + void testDeserFromEmptyJson() throws JsonProcessingException + { + String json = "{}"; + CompactionStopRequestPayload payload = MAPPER.readValue(json, CompactionStopRequestPayload.class); + assertThat(payload.compactionType()).isNull(); + assertThat(payload.compactionId()).isNull(); + } + + @Test + void testDeserializeWithEmptyStrings() throws JsonProcessingException + { + String json = "{\"compactionType\":\"\",\"compactionId\":\"\"}"; + CompactionStopRequestPayload payload = MAPPER.readValue(json, CompactionStopRequestPayload.class); + assertThat(payload.compactionType()).isNull(); + assertThat(payload.compactionId()).isEmpty(); + } + + @Test + void testDeserializeWithWhitespace() throws JsonProcessingException + { + String json = "{\"compactionType\":\" COMPACTION \",\"compactionId\":\" test-id \"}"; + CompactionStopRequestPayload payload = MAPPER.readValue(json, CompactionStopRequestPayload.class); + assertThat(payload.compactionType()).isEqualTo("COMPACTION"); + assertThat(payload.compactionId()).isEqualTo("test-id"); + } + + @Test + void testToString() + { + CompactionStopRequestPayload payload = new CompactionStopRequestPayload("COMPACTION", "abc-123"); + String toString = payload.toString(); + assertThat(toString).contains("compaction"); + assertThat(toString).contains("abc-123"); + assertThat(toString).contains("CompactionStopRequestPayload"); + } + + @Test + void testAllSupportedCompactionTypes() throws JsonProcessingException + { + // Check each compactionType field for CompactionStopRequestPayload is serialized/deserialized correctly + for (String compactionType : ALL_COMPACTION_TYPES) + { + CompactionStopRequestPayload payload = new CompactionStopRequestPayload(compactionType, null); + String json = MAPPER.writeValueAsString(payload); + assertThat(json).contains(compactionType); + + CompactionStopRequestPayload deser = MAPPER.readValue(json, CompactionStopRequestPayload.class); + assertThat(deser.compactionType()).isEqualTo(compactionType); + } + } + + @Test + void testCasePreservation() throws JsonProcessingException + { + // Test preserved in serialization/deserialization + CompactionStopRequestPayload lowerCase = new CompactionStopRequestPayload("COMPACTION", "Test-ID-123"); + String json = MAPPER.writeValueAsString(lowerCase); + assertThat(json).contains("compaction"); + assertThat(json).contains("Test-ID-123"); + + CompactionStopRequestPayload deser = MAPPER.readValue(json, CompactionStopRequestPayload.class); + assertThat(deser.compactionType()).isEqualTo("COMPACTION"); + assertThat(deser.compactionId()).isEqualTo("Test-ID-123"); + } + + @Test + void testHasValidCompactionIdWithBothFields() throws JsonProcessingException + { + String json = "{\"compactionType\":\"VALIDATION\",\"compactionId\":\"xyz-456\"}"; + CompactionStopRequestPayload payload = MAPPER.readValue(json, CompactionStopRequestPayload.class); + assertThat(payload.hasValidCompactionId()).isTrue(); + assertThat(payload.hasValidCompactionType()).isTrue(); + assertThat(payload.compactionId()).isEqualTo("xyz-456"); + assertThat(payload.compactionType()).isEqualTo("VALIDATION"); + } +} diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java index c3442373a..2b290efbf 100644 --- a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java +++ b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java @@ -34,6 +34,7 @@ import org.apache.cassandra.sidecar.common.request.CleanSSTableUploadSessionRequest; import org.apache.cassandra.sidecar.common.request.ClearSnapshotRequest; import org.apache.cassandra.sidecar.common.request.CompactionStatsRequest; +import org.apache.cassandra.sidecar.common.request.CompactionStopRequest; import org.apache.cassandra.sidecar.common.request.ConnectedClientStatsRequest; import org.apache.cassandra.sidecar.common.request.CreateSnapshotRequest; import org.apache.cassandra.sidecar.common.request.GossipHealthRequest; @@ -62,6 +63,7 @@ import org.apache.cassandra.sidecar.common.request.TimeSkewRequest; import org.apache.cassandra.sidecar.common.request.TokenRangeReplicasRequest; import org.apache.cassandra.sidecar.common.request.UploadSSTableRequest; +import org.apache.cassandra.sidecar.common.request.data.CompactionStopRequestPayload; import org.apache.cassandra.sidecar.common.request.data.Digest; import org.apache.cassandra.sidecar.common.request.data.NodeCommandRequestPayload; import org.apache.cassandra.sidecar.common.response.ListSnapshotFilesResponse; @@ -556,6 +558,18 @@ public Builder compactionStatsRequest() return request(new CompactionStatsRequest()); } + /** + * Sets the {@code request} to be a {@link CompactionStopRequest} and returns a reference to this Builder + * enabling method chaining. + * + * @param payload the payload containing compaction type or ID to stop + * @return a reference to this Builder + */ + public Builder compactionStopRequest(CompactionStopRequestPayload payload) + { + return request(new CompactionStopRequest(payload)); + } + /** * Sets the {@code request} to be a {@link TableStatsRequest} and returns a reference to this Builder * enabling method chaining. diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java index dacf4ded8..15d0db853 100644 --- a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java +++ b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java @@ -55,6 +55,7 @@ import org.apache.cassandra.sidecar.common.request.UpdateServiceConfigRequest; import org.apache.cassandra.sidecar.common.request.data.AbortRestoreJobRequestPayload; import org.apache.cassandra.sidecar.common.request.data.AllServicesConfigPayload; +import org.apache.cassandra.sidecar.common.request.data.CompactionStopRequestPayload; import org.apache.cassandra.sidecar.common.request.data.CreateRestoreJobRequestPayload; import org.apache.cassandra.sidecar.common.request.data.CreateSliceRequestPayload; import org.apache.cassandra.sidecar.common.request.data.Digest; @@ -63,6 +64,7 @@ import org.apache.cassandra.sidecar.common.request.data.UpdateCdcServiceConfigPayload; import org.apache.cassandra.sidecar.common.request.data.UpdateRestoreJobRequestPayload; import org.apache.cassandra.sidecar.common.response.CompactionStatsResponse; +import org.apache.cassandra.sidecar.common.response.CompactionStopResponse; import org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse; import org.apache.cassandra.sidecar.common.response.GossipInfoResponse; import org.apache.cassandra.sidecar.common.response.HealthResponse; @@ -768,6 +770,22 @@ public CompletableFuture compactionStats(SidecarInstanc .build()); } + /** + * Executes the compaction stop request using the default retry policy and provided {@code instance}. + * + * @param instance the instance where the request will be executed + * @param payload the payload containing compaction type or ID to stop + * @return a completable future of the compaction stop response + */ + public CompletableFuture compactionStop(SidecarInstance instance, + CompactionStopRequestPayload payload) + { + return executor.executeRequestAsync(requestBuilder() + .singleInstanceSelectionPolicy(instance) + .compactionStopRequest(payload) + .build()); + } + /** * Executes the table stats request using the default retry policy and provided {@code instance}. * diff --git a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java index 032a937c5..08d41c748 100644 --- a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java +++ b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java @@ -63,8 +63,9 @@ import org.apache.cassandra.sidecar.client.retry.RetryAction; import org.apache.cassandra.sidecar.client.retry.RetryPolicy; import org.apache.cassandra.sidecar.common.ApiEndpointsV1; +import org.apache.cassandra.sidecar.common.data.CompactionStopStatus; +import org.apache.cassandra.sidecar.common.data.Lifecycle; import org.apache.cassandra.sidecar.common.data.Lifecycle.CassandraState; -import org.apache.cassandra.sidecar.common.data.Lifecycle.OperationStatus; import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; import org.apache.cassandra.sidecar.common.data.RestoreJobSecrets; import org.apache.cassandra.sidecar.common.request.ImportSSTableRequest; @@ -72,12 +73,14 @@ import org.apache.cassandra.sidecar.common.request.Request; import org.apache.cassandra.sidecar.common.request.Service; import org.apache.cassandra.sidecar.common.request.data.AllServicesConfigPayload; +import org.apache.cassandra.sidecar.common.request.data.CompactionStopRequestPayload; import org.apache.cassandra.sidecar.common.request.data.CreateRestoreJobRequestPayload; import org.apache.cassandra.sidecar.common.request.data.MD5Digest; import org.apache.cassandra.sidecar.common.request.data.NodeCommandRequestPayload; import org.apache.cassandra.sidecar.common.request.data.UpdateCdcServiceConfigPayload; import org.apache.cassandra.sidecar.common.request.data.XXHash32Digest; import org.apache.cassandra.sidecar.common.response.CompactionStatsResponse; +import org.apache.cassandra.sidecar.common.response.CompactionStopResponse; import org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse; import org.apache.cassandra.sidecar.common.response.GossipInfoResponse; import org.apache.cassandra.sidecar.common.response.HealthResponse; @@ -88,7 +91,6 @@ import org.apache.cassandra.sidecar.common.response.ListOperationalJobsResponse; import org.apache.cassandra.sidecar.common.response.ListSnapshotFilesResponse; import org.apache.cassandra.sidecar.common.response.LiveMigrationStatus; -import org.apache.cassandra.sidecar.common.response.LiveMigrationStatus.MigrationState; import org.apache.cassandra.sidecar.common.response.NodeSettings; import org.apache.cassandra.sidecar.common.response.OperationalJobResponse; import org.apache.cassandra.sidecar.common.response.RingResponse; @@ -98,6 +100,7 @@ import org.apache.cassandra.sidecar.common.response.TableStatsResponse; import org.apache.cassandra.sidecar.common.response.TimeSkewResponse; import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse; +import org.apache.cassandra.sidecar.common.response.LiveMigrationStatus.MigrationState; import org.apache.cassandra.sidecar.common.response.data.CdcSegmentInfo; import org.apache.cassandra.sidecar.common.response.data.ClientConnectionEntry; import org.apache.cassandra.sidecar.common.response.data.CreateRestoreJobResponsePayload; @@ -1753,6 +1756,63 @@ public void testCompactionStats() throws Exception } } + @Test + void testCompactionStop() throws Exception + { + SidecarInstanceImpl instance = instances.get(0); + MockWebServer server = servers.get(0); + + // Test stop by type + String responseByType = "{\"status\":\"SUBMITTED\",\"compactionType\":\"COMPACTION\"}"; + server.enqueue(new MockResponse().setResponseCode(OK.code()).setBody(responseByType)); + + CompactionStopRequestPayload stopByType = new CompactionStopRequestPayload("COMPACTION", null); + CompletableFuture response1 = client.compactionStop(instance, stopByType); + assertThat(response1).isNotNull(); + + CompactionStopResponse compactionResponse1 = response1.get(); + + assertThat(compactionResponse1.compactionType()).isEqualTo("COMPACTION"); + assertThat(compactionResponse1.compactionId()).isEqualTo(null); + assertThat(compactionResponse1.status()).isEqualTo(CompactionStopStatus.SUBMITTED); + + // Test stop by ID + String responseById = "{\"status\":\"SUBMITTED\",\"compactionId\":\"test-id-1\"}"; + server.enqueue(new MockResponse().setResponseCode(OK.code()).setBody(responseById)); + + CompactionStopRequestPayload stopById = new CompactionStopRequestPayload(null, "test-id-1"); + CompletableFuture response2 = client.compactionStop(instance, stopById); + CompactionStopResponse compactionResponse2 = response2.get(); + + assertThat(compactionResponse2.compactionType()).isEqualTo(null); + assertThat(compactionResponse2.compactionId()).isEqualTo("test-id-1"); + assertThat(compactionResponse2.status()).isEqualTo(CompactionStopStatus.SUBMITTED); + + // Test id precedence when both inputs provided + String responseBothInputs = "{\"status\":\"SUBMITTED\"," + + "\"compactionId\":\"test-id-2\", " + + "\"compactionType\":\"VALIDATION\"}"; + server.enqueue(new MockResponse().setResponseCode(OK.code()).setBody(responseBothInputs)); + + CompactionStopRequestPayload stopAfterBothInputs + = new CompactionStopRequestPayload("VALIDATION", "test-id-2"); + CompletableFuture response3 = client.compactionStop(instance, stopAfterBothInputs); + assertThat(response3).isNotNull(); + + CompactionStopResponse compactionResponse3 = response3.get(); + assertThat(compactionResponse3.compactionType()).isEqualTo("VALIDATION"); + assertThat(compactionResponse3.compactionId()).isEqualTo("test-id-2"); + assertThat(compactionResponse3.status()).isEqualTo(CompactionStopStatus.SUBMITTED); + + // Verify the request body contains both fields when both are provided + server.takeRequest(); // First request (stop by type) + server.takeRequest(); // Second request (stop by ID) + RecordedRequest thirdRequest = server.takeRequest(); // Third request (both inputs) + String requestBody = thirdRequest.getBody().readString(Charset.defaultCharset()); + assertThat(requestBody).contains("\"compactionId\":\"test-id-2\""); + assertThat(requestBody).contains("\"compactionType\":\"VALIDATION\""); + } + @Test public void testCompactionStatsServerError() throws Exception { @@ -2081,7 +2141,7 @@ void testNodeLifecycleInfo() throws Exception assertThat(result).isNotNull(); assertThat(result.currentState()).isEqualTo(CassandraState.RUNNING); assertThat(result.desiredState()).isEqualTo(CassandraState.RUNNING); - assertThat(result.status()).isEqualTo(OperationStatus.CONVERGED); + assertThat(result.status()).isEqualTo(Lifecycle.OperationStatus.CONVERGED); assertThat(result.lastUpdate()).isEqualTo("Instance has started"); validateResponseServed(ApiEndpointsV1.LIFECYCLE_ROUTE); @@ -2102,7 +2162,7 @@ void testNodeUpdateLifecycle() throws Exception assertThat(result).isNotNull(); assertThat(result.currentState()).isEqualTo(CassandraState.RUNNING); assertThat(result.desiredState()).isEqualTo(CassandraState.STOPPED); - assertThat(result.status()).isEqualTo(OperationStatus.CONVERGING); + assertThat(result.status()).isEqualTo(Lifecycle.OperationStatus.CONVERGING); assertThat(result.lastUpdate()).isEqualTo("Submitting stop task for instance"); validateResponseServed(ApiEndpointsV1.LIFECYCLE_ROUTE, request -> { diff --git a/integration-framework/build.gradle b/integration-framework/build.gradle index 5edf49ea9..62248946c 100644 --- a/integration-framework/build.gradle +++ b/integration-framework/build.gradle @@ -31,6 +31,7 @@ test { dependencies { implementation(project(":adapters:adapters-base")) implementation(project(":adapters:adapters-cassandra41")) + implementation(project(":adapters:adapters-cassandra50")) // Needed by the Cassandra dtest framework // JUnit diff --git a/integration-framework/src/main/java/org/apache/cassandra/testing/TestUtils.java b/integration-framework/src/main/java/org/apache/cassandra/testing/TestUtils.java index 04aa7a8d6..e09830127 100644 --- a/integration-framework/src/main/java/org/apache/cassandra/testing/TestUtils.java +++ b/integration-framework/src/main/java/org/apache/cassandra/testing/TestUtils.java @@ -26,6 +26,7 @@ import org.apache.cassandra.sidecar.adapters.base.CassandraFactory; import org.apache.cassandra.sidecar.adapters.cassandra41.Cassandra41Factory; +import org.apache.cassandra.sidecar.adapters.cassandra50.Cassandra50Factory; import org.apache.cassandra.sidecar.common.server.dns.DnsResolver; import org.apache.cassandra.sidecar.common.server.utils.DriverUtils; import org.apache.cassandra.sidecar.db.schema.TableSchemaFetcher; @@ -116,6 +117,7 @@ public static CassandraVersionProvider cassandraVersionProvider(DnsResolver dnsR return new CassandraVersionProvider.Builder() .add(new CassandraFactory(dnsResolver, driverUtils, tableSchemaFetcher)) .add(new Cassandra41Factory(dnsResolver, driverUtils, tableSchemaFetcher)) + .add(new Cassandra50Factory(dnsResolver, driverUtils, tableSchemaFetcher)) .build(); } diff --git a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CompactionStopIntegrationTest.java b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CompactionStopIntegrationTest.java new file mode 100644 index 000000000..2c6e3ea40 --- /dev/null +++ b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CompactionStopIntegrationTest.java @@ -0,0 +1,592 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import com.google.common.util.concurrent.Uninterruptibles; + +import org.junit.jupiter.api.Test; + +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpResponseExpectation; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.client.HttpResponse; + +import org.apache.cassandra.sidecar.common.data.CompactionStopStatus; +import org.apache.cassandra.sidecar.common.response.CompactionStatsResponse; +import org.apache.cassandra.sidecar.common.response.CompactionStopResponse; +import org.apache.cassandra.sidecar.common.response.data.CompactionInfo; +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.cassandra.sidecar.testing.SharedClusterSidecarIntegrationTestBase; +import org.apache.cassandra.testing.ClusterBuilderConfiguration; + +import static io.vertx.core.buffer.Buffer.buffer; +import static org.apache.cassandra.testing.TestUtils.DC1_RF1; +import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE; +import static org.apache.cassandra.testing.TestUtils.TEST_TABLE_PREFIX; +import static org.apache.cassandra.testing.utils.AssertionUtils.getBlocking; +import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert; +import static org.assertj.core.api.Assertions.assertThat; + + +/** + * Integration tests for the Compaction Stop API endpoint + */ +class CompactionStopIntegrationTest extends SharedClusterSidecarIntegrationTestBase +{ + private static final String COMPACTION_STOP_ROUTE = "/api/v1/cassandra/operations/compaction/stop"; + private static final String COMPACTION_STATS_ROUTE = "/api/v1/cassandra/stats/compaction"; + private static final QualifiedName TEST_TABLE + = new QualifiedName(TEST_KEYSPACE, TEST_TABLE_PREFIX + "_compaction_test"); + private static final List COMPACTION_TEST_TABLES = new ArrayList<>(); + private static final int TABLE_COUNT = 5; + + @Override + protected ClusterBuilderConfiguration testClusterConfiguration() + { + return super.testClusterConfiguration() + .additionalInstanceConfig(Map.of( + "concurrent_compactors", 1, // Single compactor for predictability + "compaction_throughput_mb_per_sec", 5, // Base throttling at 5 MB/s + "auto_snapshot", "false", // Disable auto snapshots + "compaction_large_partition_warning_threshold_mb", "1000", // Avoid large partition warnings + "auto_compaction", "false" // Disable ALL auto-compaction globally + )); + } + + @Override + protected void initializeSchemaForTest() + { + createTestKeyspace(TEST_KEYSPACE, DC1_RF1); + createTestTable(TEST_TABLE, "CREATE TABLE %s (\n id int PRIMARY KEY, \n data text \n);"); + createTestKeyspace(TEST_KEYSPACE, DC1_RF1); + + for (int i = 1; i <= TABLE_COUNT; i++) + { + COMPACTION_TEST_TABLES.add(new QualifiedName(TEST_KEYSPACE, TEST_TABLE_PREFIX + "_compaction_" + i)); + } + + // Create test tables for compaction activity + for (QualifiedName tableName : COMPACTION_TEST_TABLES) + { + createTestTable(tableName, "CREATE TABLE %s ( \n id int PRIMARY KEY, \n data text \n);"); + } + } + + @Override + protected void beforeTestStart() + { + // Wait for schema initialization + waitForSchemaReady(30, TimeUnit.SECONDS); + + // Disable auto-compaction for ALL keyspaces at the beginning + cluster.stream().forEach(instance -> { + try + { + // First set compaction throughput to a high value to prevent any initial compactions from taking too long + instance.nodetool("setcompactionthroughput", "100"); + + // Disable auto-compaction globally (no arguments) + instance.nodetool("disableautocompaction"); + logger.info("Disabled auto-compaction globally"); + + // And for our test keyspace + instance.nodetool("disableautocompaction", TEST_KEYSPACE); + + // Log that we've disabled auto-compaction + logger.info("Auto-compaction disabled for all keyspaces"); + } + catch (Exception e) + { + logger.warn("Failed to disable autocompaction in beforeTestStart: {}", e.getMessage()); + } + }); + } + + @Test + void testStopCompactionBothParameters() + { + String payload = "{\"compactionType\":\"VALIDATION\",\"compactionId\":\"test-id-123\"}"; + HttpResponse response + = getBlocking(trustedClient().put(serverWrapper.serverPort, "localhost", COMPACTION_STOP_ROUTE) + .sendBuffer(buffer(payload)) + .expecting(HttpResponseExpectation.SC_OK)); + + assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.OK.code()); + CompactionStopResponse stopResponse = response.bodyAsJson(CompactionStopResponse.class); + assertThat(stopResponse).isNotNull(); + assertThat(stopResponse.status()).isEqualTo(CompactionStopStatus.SUBMITTED); + assertThat(stopResponse.compactionType()).isEqualTo("VALIDATION"); + assertThat(stopResponse.compactionId()).isEqualTo("test-id-123"); + } + + @Test + void testStopCompactionMissingBothParameters() + { + String payload = "{}"; + HttpResponse response + = getBlocking(trustedClient().put(serverWrapper.serverPort, "localhost", COMPACTION_STOP_ROUTE) + .sendBuffer(buffer(payload))); + + assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.BAD_REQUEST.code()); + JsonObject errorResponse = response.bodyAsJsonObject(); + assertThat(errorResponse).isNotNull(); + } + + @Test + void testStopCompactionInvalidType() + { + String payload = "{\"compactionType\":\"INVALID_TYPE\"}"; + HttpResponse response + = getBlocking(trustedClient().put(serverWrapper.serverPort, "localhost", COMPACTION_STOP_ROUTE) + .sendBuffer(buffer(payload))); + + assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.BAD_REQUEST.code()); + } + + @Test + void testStopCompactionMalformedJson() + { + String payload = "{invalid json"; + HttpResponse response + = getBlocking(trustedClient().put(serverWrapper.serverPort, "localhost", COMPACTION_STOP_ROUTE) + .sendBuffer(buffer(payload))); + + assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.BAD_REQUEST.code()); + } + + @Test + void testStopCompactionAllSupportedTypes() + { + String[] supportedTypes = { + "COMPACTION", "VALIDATION", "KEY_CACHE_SAVE", "ROW_CACHE_SAVE", + "COUNTER_CACHE_SAVE", "CLEANUP", "SCRUB", "UPGRADE_SSTABLES", + "INDEX_BUILD", "TOMBSTONE_COMPACTION", "ANTICOMPACTION", + "VERIFY", "VIEW_BUILD", "INDEX_SUMMARY", "RELOCATE", + "GARBAGE_COLLECT", "MAJOR_COMPACTION" + }; + String cassandraVersion = testVersion.version(); + + for (String compactionType : supportedTypes) + { + String payload = String.format("{\"compactionType\":\"%s\"}", compactionType); + + HttpResponse response = getBlocking( + trustedClient().put(serverWrapper.serverPort, "localhost", COMPACTION_STOP_ROUTE) + .sendBuffer(buffer(payload)) + ); + if (compactionType.equals("MAJOR_COMPACTION") && cassandraVersion.startsWith("4.")) + { + assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.BAD_REQUEST.code()); + + } + else + { + assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.OK.code()); + CompactionStopResponse stopResponse = response.bodyAsJson(CompactionStopResponse.class); + assertThat(stopResponse.status()).isEqualTo(CompactionStopStatus.SUBMITTED); + assertThat(stopResponse.compactionType()).isEqualTo(compactionType); + } + } + } + + @Test + void testUnsupportedCompactionTypeForCassandraVersion() + { + String payload = "{\"compactionType\":\"MAJOR_COMPACTION\"}"; + HttpResponse response = getBlocking( + trustedClient().put(serverWrapper.serverPort, "localhost", COMPACTION_STOP_ROUTE) + .sendBuffer(buffer(payload)) + ); + String cassandraVersion = testVersion.version(); + + // Check MAJOR_COMPACTION rejected with Cassandra 4.x, accepted with 5.x + if (cassandraVersion.startsWith("4.")) + { + assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.BAD_REQUEST.code()); + JsonObject errorResponse = response.bodyAsJsonObject(); + assertThat(errorResponse).isNotNull(); + // Error message could be from handler validation or JMX layer + String message = errorResponse.getString("message"); + assertThat(message) + .satisfiesAnyOf( + msg -> assertThat(msg).containsIgnoringCase("not supported"), + msg -> assertThat(msg).containsIgnoringCase("No enum constant"), + msg -> assertThat(msg).contains("MAJOR_COMPACTION") + ); + } + else if (cassandraVersion.startsWith("5.")) + { + assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.OK.code()); + CompactionStopResponse stopResponse = response.bodyAsJson(CompactionStopResponse.class); + assertThat(stopResponse.status()).isEqualTo(CompactionStopStatus.SUBMITTED); + assertThat(stopResponse.compactionType()).isEqualTo("MAJOR_COMPACTION"); + } + else + { + // Unknown Cassandra version + throw new AssertionError("Unexpected Cassandra version: " + cassandraVersion); + } + } + + private void generateSSTables(QualifiedName tableName, int ssTableCount) + { + String largeData = "x".repeat(1000); // 1KB of data per row + + // Double-check auto-compaction is disabled before generating data + cluster.stream().forEach(instance -> { + try + { + instance.nodetool("disableautocompaction", TEST_KEYSPACE, tableName.table()); + logger.info("Confirmed auto-compaction disabled for table {} before data generation", + tableName.table()); + } + catch (Exception e) + { + logger.warn("Failed to confirm auto-compaction is disabled: {}", e.getMessage()); + } + }); + + int rowsPerBatch = 500; + + for (int batch = 0; batch < ssTableCount; batch++) + { + logger.info("Generating batch {} of {} for table {}", batch + 1, ssTableCount, tableName.table()); + + for (int i = batch * rowsPerBatch; i < (batch + 1) * rowsPerBatch; i++) + { + String statement = String.format("INSERT INTO %s (id, data) VALUES (%d, '%s');", + tableName, i, largeData + i); + cluster.schemaChangeIgnoringStoppedInstances(statement); + } + + // Flush after each batch but verify compaction is still disabled + final int currentBatch = batch + 1; + cluster.stream().forEach(instance -> { + try + { + // Flush only accepts one parameter (keyspace) + instance.flush(TEST_KEYSPACE); + logger.debug("Flushed keyspace {} for table {}", TEST_KEYSPACE, tableName.table()); + } + catch (Exception e) + { + logger.warn("Failed to flush: {}", e.getMessage()); + } + }); + } + } + + /** + * Verifies that compactions of the specified type are no longer active + */ + private void verifyCompactionStopped(String detectedCompactionId) + { + loopAssert(10, () -> { + HttpResponse statsResponse + = getBlocking(trustedClient().get(serverWrapper.serverPort, "localhost", COMPACTION_STATS_ROUTE) + .send() + .expecting(HttpResponseExpectation.SC_OK)); + + CompactionStatsResponse stats = statsResponse.bodyAsJson(CompactionStatsResponse.class); + + // Check if compactions of this TYPE are gone from active compactions + boolean compactionsOfTypeGone = stats.activeCompactions() + .stream() + .noneMatch(c -> c.id().equals(detectedCompactionId)); + + logger.info("Verification: Compaction with id {} type {} are gone={}, active count={}", + detectedCompactionId, "COMPACTION", compactionsOfTypeGone, + stats.activeCompactionsCount()); + + assertThat(compactionsOfTypeGone).isTrue(); + }); + } + + @Test + void testCompactionStopByTypeActuallyStopped() + { + long startTime = System.currentTimeMillis(); + + try + { + logger.info("Testing that compaction stop by type actually stops compactions"); + + // 2. THEN set compaction throughput to slow value + cluster.stream().forEach(instance -> { + try + { + instance.nodetool("setcompactionthroughput", "1"); // 1 MB/sec rather than unlimited + } + catch (Exception e) + { + logger.warn("Failed to set compaction throughput for stopByType: {}", e.getMessage()); + } + }); + + // 3. THEN generate data (with reduced volume) + for (QualifiedName tableName : COMPACTION_TEST_TABLES) + { + generateSSTables(tableName, 20); // Reduced from 200 to 20 + } + + for (QualifiedName tableName : COMPACTION_TEST_TABLES) + { + cluster.stream().forEach(instance -> { + try + { + instance.nodetool("enableautocompaction", TEST_KEYSPACE, tableName.table()); + } + catch (Exception e) + { + logger.warn("Failed to re-enable autocompaction: {}", e.getMessage()); + } + }); + } + + // Add initial delay to allow compaction to start + logger.info("Waiting for compaction to start..."); + + // Poll for active compaction and stop it + boolean compactionStopped = pollAndStopCompactionByType("Compaction", 30); + + if (!compactionStopped) + { + logger.error("Could not catch compaction in testable state - skipping test"); + } + } + finally + { + long duration = System.currentTimeMillis() - startTime; + logger.info("Test completed in {} ms", duration); + } + } + + /** + * Polls for an active compaction and attempts to stop it by type + * + * @param compactionType The type of compaction to look for and stop + * @param maxAttempts Maximum number of polling attempts + * @return true if a compaction was found and stopped successfully, false otherwise + */ + private boolean pollAndStopCompactionByType(String compactionType, int maxAttempts) + { + AtomicBoolean compactionStopped = new AtomicBoolean(false); + AtomicReference startingProgress = new AtomicReference<>(0.0); + AtomicReference actualCompactionType = new AtomicReference<>(compactionType); + + loopAssert(maxAttempts, () -> { + // Get current compaction stats + HttpResponse statsResponse + = getBlocking(trustedClient().get(serverWrapper.serverPort, "localhost", COMPACTION_STATS_ROUTE) + .send() + .expecting(HttpResponseExpectation.SC_OK)); + + CompactionStatsResponse stats = statsResponse.bodyAsJson(CompactionStatsResponse.class); + + if (stats.activeCompactions().isEmpty()) + { + logger.info("No active compactions found yet"); + Uninterruptibles.sleepUninterruptibly(1000, TimeUnit.MILLISECONDS); + throw new AssertionError("No active compactions found yet"); + } + + // Found active compaction + CompactionInfo compaction = stats.activeCompactions().get(0); + double progress = compaction.percentCompleted(); + + // Only proceed if compaction is in progress but not nearly complete + if (progress <= 0.0 || progress >= 90.0) + { + logger.info("Compaction at {}% - waiting for suitable progress", progress); + throw new AssertionError("Compaction not in suitable state to stop"); + } + + // Found a suitable compaction to stop + startingProgress.set(progress); + String originalTaskType = compaction.taskType(); + actualCompactionType.set(originalTaskType.toUpperCase()); + + logger.info("Found in-progress compaction - Type: '{}', Progress: {}%, ID: {}", + actualCompactionType.get(), progress, compaction.id()); + + // Stop compaction by type + String stopPayload = "{\"compactionType\":\"" + actualCompactionType.get() + "\"}"; + + HttpResponse stopResponse + = getBlocking(trustedClient().put(serverWrapper.serverPort, "localhost", COMPACTION_STOP_ROUTE) + .sendBuffer(buffer(stopPayload)) + .expecting(HttpResponseExpectation.SC_OK)); + + assertThat(stopResponse.statusCode()).isEqualTo(HttpResponseStatus.OK.code()); + CompactionStopResponse response = stopResponse.bodyAsJson(CompactionStopResponse.class); + assertThat(response.status()).isEqualTo(CompactionStopStatus.SUBMITTED); + + logger.info("Compaction stop called successfully for type: {} at {}% progress", + actualCompactionType.get(), startingProgress.get()); + + // Verify compaction was stopped + verifyCompactionStopped(compaction.id()); + + compactionStopped.set(true); + }); + + return compactionStopped.get(); + } + + @Test + void testCompactionStopByIdActuallyStopped() + { + long startTime = System.currentTimeMillis(); + + try + { + logger.info("Testing that compaction stop by ID actually stops compactions"); + + // 2. THEN set compaction throughput to slow value + cluster.stream().forEach(instance -> { + try + { + instance.nodetool("setcompactionthroughput", "1"); // 1 MB/sec rather than unlimited + } + catch (Exception e) + { + logger.warn("Failed to set compaction throughput for stopById: {}", e.getMessage()); + } + }); + + // 3. THEN generate data (with reduced volume) + for (QualifiedName tableName : COMPACTION_TEST_TABLES) + { + generateSSTables(tableName, 20); // Reduced from 200 to 20 + } + + for (QualifiedName tableName : COMPACTION_TEST_TABLES) + { + cluster.stream().forEach(instance -> { + try + { + instance.nodetool("enableautocompaction", TEST_KEYSPACE, tableName.table()); + } + catch (Exception e) + { + logger.warn("Failed to re-enable autocompaction: {}", e.getMessage()); + } + }); + } + + // Add initial delay to allow compaction to start + logger.info("Waiting for compaction to start..."); + + // Poll for active compaction and stop it by ID + try + { + pollAndStopCompactionById(30); + } + catch (Exception e) + { + logger.warn("Could not catch compaction in testable state"); + } + } + finally + { + long duration = System.currentTimeMillis() - startTime; + logger.info("Test completed in {} ms", duration); + } + } + + /** + * Polls for an active compaction and attempts to stop it by ID + * + * @param maxAttempts Maximum number of polling attempts + * @return true if a compaction was found and stopped successfully, false otherwise + */ + private boolean pollAndStopCompactionById(int maxAttempts) + { + AtomicBoolean compactionStopped = new AtomicBoolean(false); + AtomicReference startingProgress = new AtomicReference<>(0.0); + AtomicReference capturedCompactionId = new AtomicReference<>(""); + + loopAssert(maxAttempts, () -> { + // Get current compaction stats + HttpResponse statsResponse + = getBlocking(trustedClient().get(serverWrapper.serverPort, "localhost", COMPACTION_STATS_ROUTE) + .send() + .expecting(HttpResponseExpectation.SC_OK)); + + CompactionStatsResponse stats = statsResponse.bodyAsJson(CompactionStatsResponse.class); + + if (stats.activeCompactions().isEmpty()) + { + logger.info("No active compactions found yet"); + Uninterruptibles.sleepUninterruptibly(1000, TimeUnit.MILLISECONDS); + throw new AssertionError("No active compactions found yet"); + } + + // Found active compaction + CompactionInfo compaction = stats.activeCompactions().get(0); + double progress = compaction.percentCompleted(); + + // Only proceed if compaction is in progress but not nearly complete + if (progress <= 0.0 || progress >= 90.0) + { + logger.info("Compaction at {}% - waiting for suitable progress", progress); + throw new AssertionError("Compaction not in suitable state to stop"); + } + + // Found a suitable compaction to stop - capture its ID + startingProgress.set(progress); + capturedCompactionId.set(compaction.id()); + + logger.info("Found in-progress compaction - Type: '{}', Progress: {}%, ID: {}", + compaction.taskType(), progress, capturedCompactionId.get()); + + // Stop compaction by ID + String stopPayload = "{\"compactionId\":\"" + capturedCompactionId.get() + "\"}"; + + HttpResponse stopResponse + = getBlocking(trustedClient().put(serverWrapper.serverPort, "localhost", COMPACTION_STOP_ROUTE) + .sendBuffer(buffer(stopPayload)) + .expecting(HttpResponseExpectation.SC_OK)); + + assertThat(stopResponse).isNotNull(); + assertThat(stopResponse.statusCode()).isEqualTo(HttpResponseStatus.OK.code()); + CompactionStopResponse response = stopResponse.bodyAsJson(CompactionStopResponse.class); + assertThat(response.status()).isEqualTo(CompactionStopStatus.SUBMITTED); + assertThat(response.compactionId()).isEqualTo(capturedCompactionId.get()); + + logger.info("Compaction stop called successfully for ID: {} at {}% progress", + capturedCompactionId.get(), startingProgress.get()); + + // Verify compaction was stopped + verifyCompactionStopped(capturedCompactionId.get()); + + compactionStopped.set(true); + }); + + return compactionStopped.get(); + } +} diff --git a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/CompactionManagerOperations.java b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/CompactionManagerOperations.java index eddc6d7fc..9d9395c41 100644 --- a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/CompactionManagerOperations.java +++ b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/CompactionManagerOperations.java @@ -32,4 +32,24 @@ public interface CompactionManagerOperations * @return list of compaction info maps */ List> getCompactions(); + + /** + * Stops compaction based on compaction ID. + * This method takes precedence over stopCompaction if both type and ID are provided. + * + * @param compactionId the compaction ID to stop (nullable) + * @throws IllegalArgumentException if both parameters are null or empty + */ + void stopCompactionById(String compactionId); + + /** + * Stops compaction based on type if no compaction ID is provided. + * Checks for unupported compaction type across Cassandra versions, as set of compactions type varies slightly + * between Cassandra.4x and Cassandra.5x + * @param compactionType the compaction ID to stop (nullable) + * @throws IllegalArgumentException if both parameters are null or empty, or of the provided compactionType is unsupported + */ + void stopCompaction(String compactionType); + + List supportedCompactionTypes(); } diff --git a/server/build.gradle b/server/build.gradle index a4a1418db..97cac99a0 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -180,6 +180,7 @@ dependencies { implementation(project(":server-common")) implementation(project(":adapters:adapters-base")) implementation(project(":adapters:adapters-cassandra41")) + implementation(project(":adapters:adapters-cassandra50")) implementation(project(":vertx-auth-mtls")) implementation(project(":vertx-client")) diff --git a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java index f25e4b341..9e9943eb2 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java @@ -80,6 +80,7 @@ public class BasicPermissions public static final Permission READ_RING_KEYSPACE_SCOPED = new DomainAwarePermission("RING:READ", KEYSPACE_SCOPE); public static final Permission READ_TOPOLOGY = new DomainAwarePermission("TOPOLOGY:READ", KEYSPACE_SCOPE); public static final Permission MODIFY_NATIVE = new DomainAwarePermission("NATIVE:MODIFY", CLUSTER_SCOPE); + public static final Permission MODIFY_COMPACTION = new DomainAwarePermission("COMPACTION:MODIFY", CLUSTER_SCOPE); // cassandra stats permissions diff --git a/server/src/main/java/org/apache/cassandra/sidecar/handlers/CompactionStopHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/handlers/CompactionStopHandler.java new file mode 100644 index 000000000..3c712d192 --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/handlers/CompactionStopHandler.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers; + +import java.util.Collections; +import java.util.Set; + +import com.google.inject.Inject; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.json.DecodeException; +import io.vertx.core.json.Json; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions; +import org.apache.cassandra.sidecar.common.data.CompactionStopStatus; +import org.apache.cassandra.sidecar.common.request.data.CompactionStopRequestPayload; +import org.apache.cassandra.sidecar.common.response.CompactionStopResponse; +import org.apache.cassandra.sidecar.common.server.CompactionManagerOperations; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Handler for stopping compaction operations via the Cassandra Compaction Manager. + * + *

Handles {@code PUT /api/v1/cassandra/operations/compaction/stop} requests to stop + * compaction operations. Expects a JSON payload with compactionType and/or compactionId: + *

+ *   { "compactionType": "COMPACTION", "compactionId": "abc-123" }
+ * 
+ */ +public class CompactionStopHandler extends AbstractHandler implements AccessProtected +{ + /** + * Constructs a handler with the provided {@code metadataFetcher} + * + * @param metadataFetcher the metadata fetcher + * @param executorPools executor pools for blocking executions + * @param validator validator for Cassandra-specific input + */ + @Inject + protected CompactionStopHandler(final InstanceMetadataFetcher metadataFetcher, + final ExecutorPools executorPools, + final CassandraInputValidator validator) + { + super(metadataFetcher, executorPools, validator); + } + + @Override + public Set requiredAuthorizations() + { + return Collections.singleton(BasicPermissions.MODIFY_COMPACTION.toAuthorization()); + } + + /** + * {@inheritDoc} + */ + @Override + public void handleInternal(RoutingContext context, + HttpServerRequest httpRequest, + @NotNull String host, + SocketAddress remoteAddress, + CompactionStopRequestPayload request) + { + CompactionManagerOperations compactionManagerOps = metadataFetcher.delegate(host).compactionManagerOperations(); + + executorPools.service() + .executeBlocking(() -> stopCompaction(compactionManagerOps, request)) + .onSuccess(context::json) + .onFailure(cause -> processFailure(cause, context, host, remoteAddress, request)); + } + + /** + * Stops the compaction based on the request parameters + * + * @param operations the compaction manager operations + * @param request the request payload containing compactionType and/or compactionId + * @return CompactionStopResponse with the operation result + */ + private CompactionStopResponse stopCompaction(CompactionManagerOperations operations, + CompactionStopRequestPayload request) + { + String compactionType = request.compactionType(); + String compactionId = request.compactionId(); + + // Attempt to stop compaction + // If compactionId provided, use it (takes precedence over type) + if (request.hasValidCompactionId()) + { + operations.stopCompactionById(compactionId); + } + else if (request.hasValidCompactionType()) + { + operations.stopCompaction(compactionType); + } + // If we reach here, at least one of the above conditions was true due to validation in extractParamsOrThrow() + // Return success response + return CompactionStopResponse.builder() + .compactionType(compactionType) + .compactionId(compactionId) + .status(CompactionStopStatus.SUBMITTED) + .build(); + } + + /** + * Override extractParamsOrThrow to support compactionStop param constraints + * Method extracts and validates compaction stop request from routing context + * + * @param context the routing context + * @return validated CompactionStopRequestPayload + */ + @Override + protected CompactionStopRequestPayload extractParamsOrThrow(RoutingContext context) + { + String body = context.body().asString(); + CompactionStopRequestPayload payload; + + /* + Return 400 BAD_REQUEST upon malformed JSON - avoids falling under processFailure()'s vague + 500 INTERNAL_SERVER_ERROR. Also catches invalid compaction types + */ + try + { + payload = Json.decodeValue(body, CompactionStopRequestPayload.class); + } + catch (DecodeException e) + { + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Invalid JSON payload: " + e.getMessage()); + } + catch (IllegalArgumentException e) + { + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, e.getMessage()); + } + + // Validate that at least one field is provided + if (!payload.atLeastOneParamProvided()) + { + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, + "At least one of 'compactionType' or 'compactionId' must be provided"); + } + + return payload; + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java b/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java index 8f7a2f6e4..b1588b114 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java @@ -27,6 +27,7 @@ import org.apache.cassandra.sidecar.common.ApiEndpointsV1; import org.apache.cassandra.sidecar.common.ApiEndpointsV2; import org.apache.cassandra.sidecar.common.response.CompactionStatsResponse; +import org.apache.cassandra.sidecar.common.response.CompactionStopResponse; import org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse; import org.apache.cassandra.sidecar.common.response.GossipInfoResponse; import org.apache.cassandra.sidecar.common.response.ListOperationalJobsResponse; @@ -39,6 +40,7 @@ import org.apache.cassandra.sidecar.common.response.v2.V2NodeSettings; import org.apache.cassandra.sidecar.db.schema.TableSchema; import org.apache.cassandra.sidecar.handlers.CompactionStatsHandler; +import org.apache.cassandra.sidecar.handlers.CompactionStopHandler; import org.apache.cassandra.sidecar.handlers.ConnectedClientStatsHandler; import org.apache.cassandra.sidecar.handlers.GossipInfoHandler; import org.apache.cassandra.sidecar.handlers.GossipUpdateHandler; @@ -113,6 +115,27 @@ VertxRoute cassandraCompactionStatsRoute(RouteBuilder.Factory factory, return factory.buildRouteWithHandler(compactionStatsHandler); } + @PUT + @Path(ApiEndpointsV1.COMPACTION_STOP_ROUTE) + @Operation(summary = "Stop compaction operation", + description = "Stops a compaction operation on the Cassandra node") + @APIResponse(description = "Compaction stop operation completed", + responseCode = "200", + content = @Content(mediaType = "application/json", + schema = @Schema(implementation = CompactionStopResponse.class))) + @APIResponse(responseCode = "400", + description = "Invalid request - malformed JSON body or \n invalid compaction parameters") + @ProvidesIntoMap + @KeyClassMapKey(VertxRouteMapKeys.CassandraCompactionStopRouteKey.class) + VertxRoute cassandraCompactionStopRoute(RouteBuilder.Factory factory, + CompactionStopHandler compactionStopHandler) + { + return factory.builderForRoute() + .setBodyHandler(true) // IMPORTANT: needed for JSON body parsing + .handler(compactionStopHandler) + .build(); + } + @GET @Path(ApiEndpointsV1.OPERATIONAL_JOB_ROUTE) @Operation(summary = "Get operational job status", diff --git a/server/src/main/java/org/apache/cassandra/sidecar/modules/ConfigurationModule.java b/server/src/main/java/org/apache/cassandra/sidecar/modules/ConfigurationModule.java index ad21dd45b..dae5b61db 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/modules/ConfigurationModule.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/ConfigurationModule.java @@ -36,6 +36,7 @@ import io.vertx.core.Vertx; import org.apache.cassandra.sidecar.adapters.base.CassandraFactory; import org.apache.cassandra.sidecar.adapters.cassandra41.Cassandra41Factory; +import org.apache.cassandra.sidecar.adapters.cassandra50.Cassandra50Factory; import org.apache.cassandra.sidecar.cluster.CQLSessionProviderImpl; import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; import org.apache.cassandra.sidecar.cluster.InstancesMetadata; @@ -132,7 +133,8 @@ CassandraVersionProvider cassandraVersionProvider(DnsResolver dnsResolver, Drive return new CassandraVersionProvider.Builder() .add(new CassandraFactory(dnsResolver, driverUtils, tableSchemaFetcher)) .add(new Cassandra41Factory(dnsResolver, driverUtils, tableSchemaFetcher)) - .build(); + .add(new Cassandra50Factory(dnsResolver, driverUtils, tableSchemaFetcher)) + .build(); } @Provides diff --git a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java index 9875d90b2..2e0ce9be5 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java @@ -54,6 +54,11 @@ interface CassandraCompactionStatsRouteKey extends RouteClassKey HttpMethod HTTP_METHOD = HttpMethod.GET; String ROUTE_URI = ApiEndpointsV1.COMPACTION_STATS_ROUTE; } + interface CassandraCompactionStopRouteKey extends RouteClassKey + { + HttpMethod HTTP_METHOD = HttpMethod.PUT; + String ROUTE_URI = ApiEndpointsV1.COMPACTION_STOP_ROUTE; + } interface CassandraGossipHealthRouteKey extends RouteClassKey { HttpMethod HTTP_METHOD = HttpMethod.GET; diff --git a/server/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java b/server/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java index f4167d01a..f502e8012 100644 --- a/server/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java +++ b/server/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java @@ -37,6 +37,7 @@ import org.apache.cassandra.distributed.shared.JMXUtil; import org.apache.cassandra.sidecar.adapters.base.CassandraFactory; import org.apache.cassandra.sidecar.adapters.cassandra41.Cassandra41Factory; +import org.apache.cassandra.sidecar.adapters.cassandra50.Cassandra50Factory; import org.apache.cassandra.sidecar.cluster.CQLSessionProviderImpl; import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; import org.apache.cassandra.sidecar.cluster.InstancesMetadata; @@ -130,6 +131,7 @@ public static CassandraVersionProvider cassandraVersionProvider(DnsResolver dnsR return new CassandraVersionProvider.Builder() .add(new CassandraFactory(dnsResolver, driverUtils, tableSchemaFetcher)) .add(new Cassandra41Factory(dnsResolver, driverUtils, tableSchemaFetcher)) + .add(new Cassandra50Factory(dnsResolver, driverUtils, tableSchemaFetcher)) .build(); } diff --git a/server/src/test/java/org/apache/cassandra/sidecar/handlers/CompactionStopHandlerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/handlers/CompactionStopHandlerTest.java new file mode 100644 index 000000000..523a66bae --- /dev/null +++ b/server/src/test/java/org/apache/cassandra/sidecar/handlers/CompactionStopHandlerTest.java @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers; + +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.google.inject.Singleton; +import com.google.inject.util.Modules; + +import io.vertx.core.Vertx; +import io.vertx.ext.web.client.WebClient; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; + +import org.apache.cassandra.sidecar.TestModule; +import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; +import org.apache.cassandra.sidecar.cluster.InstancesMetadata; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.data.CompactionStopStatus; +import org.apache.cassandra.sidecar.common.response.CompactionStopResponse; +import org.apache.cassandra.sidecar.common.server.CompactionManagerOperations; +import org.apache.cassandra.sidecar.modules.SidecarModules; +import org.apache.cassandra.sidecar.server.Server; + +import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.vertx.core.buffer.Buffer.buffer; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link CompactionStopHandler} class + */ +@ExtendWith(VertxExtension.class) +public class CompactionStopHandlerTest +{ + private static final String TEST_ROUTE = "/api/v1/cassandra/operations/compaction/stop"; + + Vertx vertx; + Server server; + CompactionManagerOperations mockCompactionManagerOperations = mock(CompactionManagerOperations.class); + + @BeforeEach + void before() throws InterruptedException + { + Injector injector; + Module testOverride = Modules.override(new TestModule()).with(new CompactionStopHandlerTestModule()); + injector = Guice.createInjector(Modules.override(SidecarModules.all()).with(testOverride)); + vertx = injector.getInstance(Vertx.class); + server = injector.getInstance(Server.class); + VertxTestContext context = new VertxTestContext(); + server.start().onSuccess(s -> context.completeNow()).onFailure(context::failNow); + context.awaitCompletion(5, TimeUnit.SECONDS); + + // Mock supportedCompactionTypes to return all types for testing + when(mockCompactionManagerOperations.supportedCompactionTypes()).thenReturn( + java.util.Arrays.asList("COMPACTION", "VALIDATION", "KEY_CACHE_SAVE", "ROW_CACHE_SAVE", + "COUNTER_CACHE_SAVE", "CLEANUP", "SCRUB", "UPGRADE_SSTABLES", + "INDEX_BUILD", "TOMBSTONE_COMPACTION", "ANTICOMPACTION", + "VERIFY", "VIEW_BUILD", "INDEX_SUMMARY", "RELOCATE", + "GARBAGE_COLLECT", "MAJOR_COMPACTION")); + } + + @AfterEach + void after() throws InterruptedException + { + if (server != null) + { + VertxTestContext closeContext = new VertxTestContext(); + server.close() + .onComplete(res -> closeContext.completeNow()); + closeContext.awaitCompletion(60, TimeUnit.SECONDS); + } + } + + @Test + void testStopCompactionByTypeHappyPath(VertxTestContext ctx) + { + WebClient client = WebClient.create(vertx); + String payload = "{\"compactionType\":\"COMPACTION\"}"; + client.put(server.actualPort(), "127.0.0.1", TEST_ROUTE) + .sendBuffer(buffer(payload), ctx.succeeding(resp -> { + ctx.verify(() -> { + verify(mockCompactionManagerOperations, times(1)).stopCompaction(eq("COMPACTION")); + + assertThat(resp.statusCode()).isEqualTo(OK.code()); + CompactionStopResponse response = resp.bodyAsJson(CompactionStopResponse.class); + assertThat(response.status()).isEqualTo(CompactionStopStatus.SUBMITTED); + assertThat(response.compactionType()).isEqualTo("COMPACTION"); + }); + ctx.completeNow(); + })); + } + + @Test + void testStopCompactionByIdHappyPath(VertxTestContext ctx) + { + WebClient client = WebClient.create(vertx); + String payload = "{\"compactionId\":\"abc-123\"}"; + client.put(server.actualPort(), "127.0.0.1", TEST_ROUTE) + .sendBuffer(buffer(payload), ctx.succeeding(resp -> { + ctx.verify(() -> { + verify(mockCompactionManagerOperations, times(1)).stopCompactionById(eq("abc-123")); + + assertThat(resp.statusCode()).isEqualTo(OK.code()); + CompactionStopResponse response = resp.bodyAsJson(CompactionStopResponse.class); + assertThat(response.status()).isEqualTo(CompactionStopStatus.SUBMITTED); + assertThat(response.compactionId()).isEqualTo("abc-123"); + }); + ctx.completeNow(); + })); + } + + @Test + void testStopCompactionByBothFieldsHappyPath(VertxTestContext ctx) + { + WebClient client = WebClient.create(vertx); + String payload = "{\"compactionType\":\"VALIDATION\",\"compactionId\":\"xyz-456\"}"; + client.put(server.actualPort(), "127.0.0.1", TEST_ROUTE) + .sendBuffer(buffer(payload), ctx.succeeding(resp -> { + ctx.verify(() -> { + verify(mockCompactionManagerOperations, times(1)) + .stopCompactionById(eq("xyz-456")); + + assertThat(resp.statusCode()).isEqualTo(OK.code()); + CompactionStopResponse response = resp.bodyAsJson(CompactionStopResponse.class); + assertThat(response.status()).isEqualTo(CompactionStopStatus.SUBMITTED); + assertThat(response.compactionType()).isEqualTo("VALIDATION"); + assertThat(response.compactionId()).isEqualTo("xyz-456"); + }); + ctx.completeNow(); + })); + } + + @Test + void testMissingBothFields(VertxTestContext ctx) + { + WebClient client = WebClient.create(vertx); + String payload = "{}"; + client.put(server.actualPort(), "127.0.0.1", TEST_ROUTE) + .sendBuffer(buffer(payload), ctx.succeeding(resp -> { + ctx.verify(() -> { + assertThat(resp.statusCode()).isEqualTo(BAD_REQUEST.code()); + verify(mockCompactionManagerOperations, times(0)) + .stopCompaction(anyString()); + }); + ctx.completeNow(); + })); + } + + @Test + void testBothFieldsEmpty(VertxTestContext ctx) + { + WebClient client = WebClient.create(vertx); + String payload = "{\"compactionType\":\"\",\"compactionId\":\"\"}"; + client.put(server.actualPort(), "127.0.0.1", TEST_ROUTE) + .sendBuffer(buffer(payload), ctx.succeeding(resp -> { + ctx.verify(() -> { + assertThat(resp.statusCode()).isEqualTo(BAD_REQUEST.code()); + verify(mockCompactionManagerOperations, times(0)) + .stopCompaction(anyString()); + }); + ctx.completeNow(); + })); + } + + @Test + void testInvalidCompactionType(VertxTestContext ctx) + { + // Configure mock to throw exception for invalid compaction type + doThrow(new IllegalArgumentException("compaction type INVALID_TYPE is not supported")) + .when(mockCompactionManagerOperations).stopCompaction("INVALID_TYPE"); + + WebClient client = WebClient.create(vertx); + String payload = "{\"compactionType\":\"INVALID_TYPE\"}"; + client.put(server.actualPort(), "127.0.0.1", TEST_ROUTE) + .sendBuffer(buffer(payload), ctx.succeeding(resp -> { + ctx.verify(() -> { + assertThat(resp.statusCode()).isEqualTo(BAD_REQUEST.code()); + verify(mockCompactionManagerOperations, times(1)) + .stopCompaction(eq("INVALID_TYPE")); + }); + ctx.completeNow(); + })); + } + + @Test + void testMalformedJson(VertxTestContext ctx) + { + WebClient client = WebClient.create(vertx); + String payload = "{invalid json"; + client.put(server.actualPort(), "127.0.0.1", TEST_ROUTE) + .sendBuffer(buffer(payload), ctx.succeeding(resp -> { + ctx.verify(() -> { + assertThat(resp.statusCode()).isEqualTo(BAD_REQUEST.code()); + verify(mockCompactionManagerOperations, times(0)) + .stopCompaction(anyString()); + }); + ctx.completeNow(); + })); + } + + @Test + void testTrimWhitespace(VertxTestContext ctx) + { + WebClient client = WebClient.create(vertx); + String payload = "{\"compactionType\":\" COMPACTION \"}"; + client.put(server.actualPort(), "127.0.0.1", TEST_ROUTE) + .sendBuffer(buffer(payload), ctx.succeeding(resp -> { + ctx.verify(() -> { + // Should trim and pass as uppercase enum name + verify(mockCompactionManagerOperations, times(1)) + .stopCompaction(eq("COMPACTION")); + + assertThat(resp.statusCode()).isEqualTo(OK.code()); + }); + ctx.completeNow(); + })); + } + + @Test + void testCaseInsensitiveCompactionType(VertxTestContext ctx) + { + WebClient client = WebClient.create(vertx); + String payload = "{\"compactionType\":\"compaction\"}"; + client.put(server.actualPort(), "127.0.0.1", TEST_ROUTE) + .sendBuffer(buffer(payload), ctx.succeeding(resp -> { + ctx.verify(() -> { + // Should accept lowercase input and send uppercase to Cassandra + verify(mockCompactionManagerOperations, times(1)) + .stopCompaction(eq("COMPACTION")); + + assertThat(resp.statusCode()).isEqualTo(OK.code()); + }); + ctx.completeNow(); + })); + } + + @Test + void testAllSupportedCompactionTypes(VertxTestContext ctx) throws InterruptedException + { + String[] supportedTypes = { + "COMPACTION", "VALIDATION", "KEY_CACHE_SAVE", "ROW_CACHE_SAVE", + "COUNTER_CACHE_SAVE", "CLEANUP", "SCRUB", "UPGRADE_SSTABLES", + "INDEX_BUILD", "TOMBSTONE_COMPACTION", "ANTICOMPACTION", + "VERIFY", "VIEW_BUILD", "INDEX_SUMMARY", "RELOCATE", + "GARBAGE_COLLECT", "MAJOR_COMPACTION" + }; + + WebClient client = WebClient.create(vertx); + + CountDownLatch expectedCalls = new CountDownLatch(supportedTypes.length); + for (String type : supportedTypes) + { + String payload = "{\"compactionType\":\"" + type + "\"}"; + client.put(server.actualPort(), "127.0.0.1", TEST_ROUTE) + .sendBuffer(buffer(payload), ctx.succeeding(resp -> { + ctx.verify(() -> { + assertThat(resp.statusCode()).isEqualTo(OK.code()); + CompactionStopResponse response = resp.bodyAsJson(CompactionStopResponse.class); + assertThat(response.status()).isEqualTo(CompactionStopStatus.SUBMITTED); + expectedCalls.countDown(); + }); + })); + } + expectedCalls.await(30, TimeUnit.SECONDS); + ctx.completeNow(); + } + + /** + * Test guice module for {@link CompactionStopHandler} tests + */ + class CompactionStopHandlerTestModule extends AbstractModule + { + @Provides + @Singleton + public InstancesMetadata instanceMetadata() + { + final int instanceId = 100; + final String host = "127.0.0.1"; + final InstanceMetadata instanceMetadata = mock(InstanceMetadata.class); + when(instanceMetadata.host()).thenReturn(host); + when(instanceMetadata.port()).thenReturn(9042); + when(instanceMetadata.id()).thenReturn(instanceId); + when(instanceMetadata.stagingDir()).thenReturn(""); + + CassandraAdapterDelegate delegate = mock(CassandraAdapterDelegate.class); + when(delegate.compactionManagerOperations()).thenReturn(mockCompactionManagerOperations); + when(instanceMetadata.delegate()).thenReturn(delegate); + + InstancesMetadata mockInstancesMetadata = mock(InstancesMetadata.class); + when(mockInstancesMetadata.instances()).thenReturn(Collections.singletonList(instanceMetadata)); + when(mockInstancesMetadata.instanceFromId(instanceId)).thenReturn(instanceMetadata); + when(mockInstancesMetadata.instanceFromHost(host)).thenReturn(instanceMetadata); + + return mockInstancesMetadata; + } + } +} diff --git a/settings.gradle b/settings.gradle index 5d91edb02..cfb616788 100644 --- a/settings.gradle +++ b/settings.gradle @@ -29,6 +29,7 @@ include "vertx-client-shaded" if (JavaVersion.current().isJava11Compatible()) { include "adapters:adapters-base" include "adapters:adapters-cassandra41" + include "adapters:adapters-cassandra50" include "docs" include "server" include "server-common"