From 5def3c4d24daff8800d3edca511695f0aab918a4 Mon Sep 17 00:00:00 2001 From: Sudipta Laha Date: Mon, 29 Sep 2025 21:13:53 -0700 Subject: [PATCH 01/10] CASSSIDECAR-344: Sidecar endpoint for moving a node to a new token --- CHANGES.txt | 1 + .../base/CassandraStorageOperations.java | 10 + .../GossipDependentStorageJmxOperations.java | 6 + .../base/jmx/StorageJmxOperations.java | 7 + .../sidecar/common/ApiEndpointsV1.java | 1 + .../common/request/NodeMoveRequest.java | 48 +++ .../sidecar/client/RequestContext.java | 13 + .../sidecar/client/SidecarClient.java | 15 + .../sidecar/client/SidecarClientTest.java | 20 ++ ...assandraNodeOperationsIntegrationTest.java | 161 ++++++++++ .../common/server/StorageOperations.java | 7 + .../acl/authorization/BasicPermissions.java | 1 + .../sidecar/handlers/AbstractHandler.java | 35 +++ .../sidecar/handlers/NodeMoveHandler.java | 113 +++++++ .../cassandra/sidecar/job/NodeMoveJob.java | 71 +++++ .../modules/CassandraOperationsModule.java | 21 ++ .../multibindings/VertxRouteMapKeys.java | 5 + .../sidecar/handlers/NodeMoveHandlerTest.java | 292 ++++++++++++++++++ .../sidecar/job/NodeMoveJobTest.java | 161 ++++++++++ 19 files changed, 988 insertions(+) create mode 100644 client-common/src/main/java/org/apache/cassandra/sidecar/common/request/NodeMoveRequest.java create mode 100644 integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java create mode 100644 server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java create mode 100644 server/src/main/java/org/apache/cassandra/sidecar/job/NodeMoveJob.java create mode 100644 server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandlerTest.java create mode 100644 server/src/test/java/org/apache/cassandra/sidecar/job/NodeMoveJobTest.java diff --git a/CHANGES.txt b/CHANGES.txt index 0fb5a9325..2036d33da 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 0.3.0 ----- + * Sidecar endpoint for moving a node to a new token (CASSSIDECAR-344) * Improve FilteringMetricRegistry implementation (CASSSIDECAR-347) * Add lifecycle APIs for starting and stopping Cassandra (CASSSIDECAR-266) * Implementation of CassandraClusterSchemaMonitor (CASSSIDECAR-245) diff --git a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java index 7bfdfdd4b..5ec27ec4f 100644 --- a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java +++ b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java @@ -323,4 +323,14 @@ public int getCompactionThroughputMbPerSec() return jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME) .getCompactionThroughputMbPerSec(); } + + /** + * {@inheritDoc} + */ + @Override + public void move(String newToken) + { + jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME) + .move(newToken); + } } diff --git a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java index a40255edf..28ca98c1d 100644 --- a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java +++ b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java @@ -219,4 +219,10 @@ public int getCompactionThroughputMbPerSec() { return delegate.getCompactionThroughputMbPerSec(); } + + @Override + public void move(String newToken) + { + delegate.move(newToken); + } } diff --git a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java index c76ccce6d..7b0c482df 100644 --- a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java +++ b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java @@ -228,4 +228,11 @@ public interface StorageJmxOperations * @return the current compaction throughput in megabytes per second, or 0 if throughput cannot be determined */ int getCompactionThroughputMbPerSec(); + + /** + * Triggers the node move operation to move this node to a new token + * + * @param newToken the new token for the node to move to + */ + void move(String newToken); } diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java index d67e4cdc3..90a1a2e10 100644 --- a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java @@ -140,6 +140,7 @@ public final class ApiEndpointsV1 public static final String LIST_OPERATIONAL_JOBS_ROUTE = API_V1 + CASSANDRA + OPERATIONAL_JOBS; public static final String OPERATIONAL_JOB_ROUTE = API_V1 + CASSANDRA + PER_OPERATIONAL_JOB; public static final String NODE_DECOMMISSION_ROUTE = API_V1 + CASSANDRA + "/operations/decommission"; + public static final String NODE_MOVE_ROUTE = API_V1 + CASSANDRA + "/operations/move"; public static final String STREAM_STATS_ROUTE = API_V1 + CASSANDRA + "/stats/streams"; public static final String TABLE_STATS_ROUTE = API_V1 + CASSANDRA + PER_KEYSPACE + PER_TABLE + "/stats"; diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/NodeMoveRequest.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/NodeMoveRequest.java new file mode 100644 index 000000000..1c9e11741 --- /dev/null +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/NodeMoveRequest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.request; + +import io.netty.handler.codec.http.HttpMethod; +import org.apache.cassandra.sidecar.common.ApiEndpointsV1; +import org.apache.cassandra.sidecar.common.response.OperationalJobResponse; + +/** + * Represents a request to execute node move operation + */ +public class NodeMoveRequest extends JsonRequest +{ + /** + * Constructs a request to execute a node move operation + * + * @param newToken the new token for the node to move to + */ + public NodeMoveRequest(String newToken) + { + super(ApiEndpointsV1.NODE_MOVE_ROUTE + "?newToken=" + newToken); + } + + /** + * {@inheritDoc} + */ + @Override + public HttpMethod method() + { + return HttpMethod.PUT; + } +} diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java index 2e630c315..7d2294171 100644 --- a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java +++ b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java @@ -45,6 +45,7 @@ import org.apache.cassandra.sidecar.common.request.ListSnapshotFilesRequest; import org.apache.cassandra.sidecar.common.request.NativeUpdateRequest; import org.apache.cassandra.sidecar.common.request.NodeDecommissionRequest; +import org.apache.cassandra.sidecar.common.request.NodeMoveRequest; import org.apache.cassandra.sidecar.common.request.NodeSettingsRequest; import org.apache.cassandra.sidecar.common.request.OperationalJobRequest; import org.apache.cassandra.sidecar.common.request.ReportSchemaRequest; @@ -582,6 +583,18 @@ public Builder nodeDecommissionRequest() return request(NODE_DECOMMISSION_REQUEST); } + /** + * Sets the {@code request} to be a {@link NodeMoveRequest} and returns a reference to this Builder + * enabling method chaining. + * + * @param newToken the new token for the node to move to + * @return a reference to this Builder + */ + public Builder nodeMoveRequest(String newToken) + { + return request(new NodeMoveRequest(newToken)); + } + /** * Sets the {@code request} to be a {@link GossipUpdateRequest} for the * given {@link NodeCommandRequestPayload.State state}, and returns a reference to this Builder enabling method chaining. diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java index 373bd336e..615707882 100644 --- a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java +++ b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java @@ -823,6 +823,21 @@ public CompletableFuture nodeDecommission(SidecarInstanc .build()); } + /** + * Executes the node move request using the default retry policy and configured selection policy + * + * @param instance the instance where the request will be executed + * @param newToken the new token for the node to move to + * @return a completable future of the operational job response + */ + public CompletableFuture nodeMove(SidecarInstance instance, String newToken) + { + return executor.executeRequestAsync(requestBuilder() + .singleInstanceSelectionPolicy(instance) + .nodeMoveRequest(newToken) + .build()); + } + /** * Sends a request to start or stop Cassandra gossiping on the provided instance. *

diff --git a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java index b13a209bc..d38c8269c 100644 --- a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java +++ b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java @@ -1393,6 +1393,26 @@ public void testNodeDecommission() throws Exception validateResponseServed(ApiEndpointsV1.NODE_DECOMMISSION_ROUTE); } + @Test + public void testNodeMove() throws Exception + { + UUID jobId = UUID.randomUUID(); + String newToken = "123456789"; + String nodeMoveString = "{\"jobId\":\"" + jobId + "\",\"jobStatus\":\"SUCCEEDED\",\"instance\":\"127.0.0.1\"}"; + + MockResponse response = new MockResponse() + .setResponseCode(OK.code()) + .setHeader("content-type", "application/json") + .setBody(nodeMoveString); + enqueue(response); + + SidecarInstanceImpl sidecarInstance = RequestExecutorTest.newSidecarInstance(servers.get(0)); + OperationalJobResponse result = client.nodeMove(sidecarInstance, newToken).get(30, TimeUnit.SECONDS); + assertThat(result).isNotNull(); + assertThat(result.status()).isEqualTo(OperationalJobStatus.SUCCEEDED); + validateResponseServed(ApiEndpointsV1.NODE_MOVE_ROUTE + "?newToken=" + newToken); + } + @Test void testFailsWithOneAttemptPerServer() { diff --git a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java new file mode 100644 index 000000000..806e6b508 --- /dev/null +++ b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; + +import io.vertx.core.buffer.Buffer; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.client.HttpResponse; +import org.apache.cassandra.sidecar.common.ApiEndpointsV1; +import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; +import org.apache.cassandra.sidecar.testing.SharedClusterSidecarIntegrationTestBase; + +import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static org.apache.cassandra.testing.utils.AssertionUtils.getBlocking; +import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Integration tests for Cassandra node operations + */ +public class CassandraNodeOperationsIntegrationTest extends SharedClusterSidecarIntegrationTestBase +{ + public static final String CASSANDRA_VERSION_4_0 = "4.0"; + + @Override + protected void initializeSchemaForTest() + { + // No schema init needed + } + + @Override + protected void beforeTestStart() + { + // wait for the schema initialization + waitForSchemaReady(30, TimeUnit.SECONDS); + } + + @Test + void testNodeMoveOperationSuccess() + { + // Use a test token - this is a valid token for Murmur3Partitioner + String testToken = "123456789"; + + // Initiate move operation + HttpResponse moveResponse = getBlocking( + trustedClient().put(serverWrapper.serverPort, "localhost", ApiEndpointsV1.NODE_MOVE_ROUTE + "?newToken=" + testToken) + .send()); + + assertThat(moveResponse.statusCode()).isIn(OK.code(), ACCEPTED.code()); + + JsonObject responseBody = moveResponse.bodyAsJsonObject(); + assertThat(responseBody).isNotNull(); + assertThat(responseBody.getString("jobId")).isNotNull(); + assertThat(responseBody.getString("operation")).isEqualTo("move"); + assertThat(responseBody.getString("jobStatus")).isIn( + OperationalJobStatus.CREATED.name(), + OperationalJobStatus.RUNNING.name(), + OperationalJobStatus.SUCCEEDED.name() + ); + + // Verify the job eventually completes (or at least gets processed) + loopAssert(30, 500, () -> { + HttpResponse streamStatsResponse = getBlocking( + trustedClient().get(serverWrapper.serverPort, "localhost", ApiEndpointsV1.STREAM_STATS_ROUTE) + .send()); + + assertThat(streamStatsResponse.statusCode()).isEqualTo(OK.code()); + + JsonObject streamStats = streamStatsResponse.bodyAsJsonObject(); + assertThat(streamStats).isNotNull(); + // The operationMode should be either NORMAL (completed) or MOVING (in progress) + assertThat(streamStats.getString("operationMode")).isIn("NORMAL", "MOVING"); + }); + + // Validate the operational job status using the OperationalJobHandler + String jobId = responseBody.getString("jobId"); + validateOperationalJobStatus(jobId, "move"); + } + + /** + * Validates the operational job status by querying the OperationalJobHandler endpoint + * and waiting for the job to reach a final state if necessary. + * + * @param jobId the ID of the operational job to validate + * @param expectedOperation the expected operation name (e.g., "move", "decommission", "drain") + */ + private void validateOperationalJobStatus(String jobId, String expectedOperation) + { + String operationalJobRoute = ApiEndpointsV1.OPERATIONAL_JOB_ROUTE.replace(":operationId", jobId); + + HttpResponse jobStatusResponse = getBlocking( + trustedClient().get(serverWrapper.serverPort, "localhost", operationalJobRoute) + .send()); + + assertThat(jobStatusResponse.statusCode()).isEqualTo(OK.code()); + + JsonObject jobStatusBody = jobStatusResponse.bodyAsJsonObject(); + assertThat(jobStatusBody).isNotNull(); + assertThat(jobStatusBody.getString("jobId")).isEqualTo(jobId); + assertThat(jobStatusBody.getString("operation")).isEqualTo(expectedOperation); + assertThat(jobStatusBody.getString("jobStatus")).isIn( + OperationalJobStatus.RUNNING.name(), + OperationalJobStatus.SUCCEEDED.name() + ); + + // If the job is still running, wait for it to complete or reach a final state + if (OperationalJobStatus.RUNNING.name().equals(jobStatusBody.getString("jobStatus"))) + { + loopAssert(30, 500, () -> { + HttpResponse finalJobStatusResponse = getBlocking( + trustedClient().get(serverWrapper.serverPort, "localhost", operationalJobRoute) + .send()); + + assertThat(finalJobStatusResponse.statusCode()).isEqualTo(OK.code()); + + JsonObject finalJobStatusBody = finalJobStatusResponse.bodyAsJsonObject(); + assertThat(finalJobStatusBody).isNotNull(); + assertThat(finalJobStatusBody.getString("jobStatus")).isIn( + OperationalJobStatus.SUCCEEDED.name(), + OperationalJobStatus.FAILED.name() + ); + }); + } + } + + /** + * {@inheritDoc} + */ + @Override + protected void tearDown() throws Exception + { + try + { + super.tearDown(); + } + catch (IllegalStateException ex) + { + logger.error("Exception in tear down", ex); + } + } +} \ No newline at end of file diff --git a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java index d135e61d6..e0b0252e5 100644 --- a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java +++ b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java @@ -174,4 +174,11 @@ default void outOfRangeDataCleanup(@NotNull String keyspace, @NotNull String tab * @return the current compaction throughput in megabytes per second, or 0 if throughput cannot be determined */ int getCompactionThroughputMbPerSec(); + + /** + * Triggers the node move operation to move the node to a new token. + * + * @param newToken the new token for the node to move to + */ + void move(String newToken); } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java index b57ac6234..1acd0b4f8 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java @@ -62,6 +62,7 @@ public class BasicPermissions // sidecar operation related permissions public static final Permission READ_OPERATIONAL_JOB = new DomainAwarePermission("OPERATIONAL_JOB:READ", OPERATION_SCOPE); public static final Permission DECOMMISSION_NODE = new DomainAwarePermission("NODE:DECOMMISSION", OPERATION_SCOPE); + public static final Permission MOVE_NODE = new DomainAwarePermission("NODE:MOVE", OPERATION_SCOPE); // Permissions related to Schema Reporting public static final Permission REPORT_SCHEMA = new DomainAwarePermission("SCHEMA:PUBLISH", CLUSTER_SCOPE); diff --git a/server/src/main/java/org/apache/cassandra/sidecar/handlers/AbstractHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/handlers/AbstractHandler.java index a7c4f7f69..72cc0a955 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/handlers/AbstractHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/handlers/AbstractHandler.java @@ -29,14 +29,21 @@ import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.handler.HttpException; import org.apache.cassandra.sidecar.adapters.base.exception.OperationUnavailableException; +import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; +import org.apache.cassandra.sidecar.common.response.OperationalJobResponse; import org.apache.cassandra.sidecar.common.server.data.Name; import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName; import org.apache.cassandra.sidecar.common.server.exceptions.JmxAuthenticationException; import org.apache.cassandra.sidecar.common.utils.Preconditions; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.config.ServiceConfiguration; import org.apache.cassandra.sidecar.exceptions.NoSuchCassandraInstanceException; +import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException; +import org.apache.cassandra.sidecar.job.OperationalJob; +import org.apache.cassandra.sidecar.job.OperationalJobManager; import org.apache.cassandra.sidecar.utils.CassandraInputValidator; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.apache.cassandra.sidecar.utils.OperationalJobUtils; import org.jetbrains.annotations.NotNull; import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; @@ -317,4 +324,32 @@ public static String extractHostAddressWithoutPort(HttpServerRequest request) th } return host; } + + /** + * Handles the submission and execution of an operational job. + * + * @param jobManager the manager responsible for submitting and tracking operational jobs + * @param config the service configuration containing execution parameters + * @param context the routing context for the HTTP request/response + * @param job the operational job to be executed + */ + protected void handleOperationalJob(OperationalJobManager jobManager, ServiceConfiguration config, RoutingContext context, OperationalJob job) + { + try + { + jobManager.trySubmitJob(job); + } + catch (OperationalJobConflictException oje) + { + String reason = oje.getMessage(); + logger.error("Conflicting job encountered. reason={}", reason); + context.response().setStatusCode(HttpResponseStatus.CONFLICT.code()); + context.json(new OperationalJobResponse(job.jobId(), OperationalJobStatus.FAILED, job.name(), reason)); + return; + } + + // Get the result, waiting for the specified wait time for result + job.asyncResult(executorPools.service(), config.operationalJobExecutionMaxWaitTime()) + .onComplete(v -> OperationalJobUtils.sendStatusBasedResponse(context, job)); + } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java new file mode 100644 index 000000000..975e96b04 --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers; + +import java.math.BigInteger; +import java.util.Collections; +import java.util.Set; + +import com.datastax.driver.core.utils.UUIDs; +import com.google.inject.Inject; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.config.ServiceConfiguration; +import org.apache.cassandra.sidecar.job.NodeMoveJob; +import org.apache.cassandra.sidecar.job.OperationalJobManager; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.jetbrains.annotations.NotNull; + +/** + * Provides REST API for asynchronously moving the corresponding Cassandra node to a new token + */ +public class NodeMoveHandler extends AbstractHandler implements AccessProtected +{ + private final OperationalJobManager jobManager; + private final ServiceConfiguration config; + + /** + * Constructs a handler with the provided {@code metadataFetcher} + * + * @param metadataFetcher the interface to retrieve instance metadata + * @param executorPools the executor pools for blocking executions + * @param validator a validator instance to validate Cassandra-specific input + */ + @Inject + protected NodeMoveHandler(InstanceMetadataFetcher metadataFetcher, + ExecutorPools executorPools, + ServiceConfiguration serviceConfiguration, + CassandraInputValidator validator, + OperationalJobManager jobManager) + { + super(metadataFetcher, executorPools, validator); + this.jobManager = jobManager; + this.config = serviceConfiguration; + } + + @Override + public Set requiredAuthorizations() + { + return Collections.singleton(BasicPermissions.MOVE_NODE.toAuthorization()); + } + + /** + * {@inheritDoc} + */ + @Override + public void handleInternal(RoutingContext context, + HttpServerRequest httpRequest, + @NotNull String host, + SocketAddress remoteAddress, + String newToken) + { + StorageOperations operations = metadataFetcher.delegate(host).storageOperations(); + NodeMoveJob job = new NodeMoveJob(UUIDs.timeBased(), newToken, operations); + handleOperationalJob(jobManager, config, context, job); + } + + /** + * {@inheritDoc} + */ + @Override + protected String extractParamsOrThrow(RoutingContext context) + { + String newToken = context.request().getParam("newToken"); + if (newToken == null || newToken.isBlank()) + { + throw new IllegalArgumentException("newToken parameter is required"); + } + + String trimmedToken = newToken.trim(); + try + { + new BigInteger(trimmedToken); + } + catch (NumberFormatException e) + { + throw new IllegalArgumentException( + String.format("newToken parameter must be a valid integer. Provided value=%s", newToken), e); + } + return trimmedToken; + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/job/NodeMoveJob.java b/server/src/main/java/org/apache/cassandra/sidecar/job/NodeMoveJob.java new file mode 100644 index 000000000..f9f3cbf8d --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/job/NodeMoveJob.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.job; + +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.sidecar.common.server.StorageOperations; + +/** + * Implementation of {@link OperationalJob} to perform node move operation. + */ +public class NodeMoveJob extends OperationalJob +{ + private static final Logger LOGGER = LoggerFactory.getLogger(NodeMoveJob.class); + private static final String OPERATION = "move"; + private static final String OPERATION_MODE_MOVING = "MOVING"; + private final String newToken; + protected StorageOperations storageOperations; + + public NodeMoveJob(UUID jobId, String newToken, StorageOperations storageOps) + { + super(jobId); + this.newToken = newToken; + this.storageOperations = storageOps; + } + + @Override + public boolean isRunningOnCassandra() + { + String operationMode = storageOperations.operationMode(); + return OPERATION_MODE_MOVING.equals(operationMode); + } + + /** + * {@inheritDoc} + */ + @Override + protected void executeInternal() + { + LOGGER.info("Executing move operation. jobId={} newToken={}", this.jobId(), newToken); + storageOperations.move(newToken); + } + + /** + * {@inheritDoc} + */ + @Override + public String name() + { + return OPERATION; + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java b/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java index 9b1278a88..d7a183469 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java @@ -45,6 +45,7 @@ import org.apache.cassandra.sidecar.handlers.ListOperationalJobsHandler; import org.apache.cassandra.sidecar.handlers.NativeUpdateHandler; import org.apache.cassandra.sidecar.handlers.NodeDecommissionHandler; +import org.apache.cassandra.sidecar.handlers.NodeMoveHandler; import org.apache.cassandra.sidecar.handlers.OperationalJobHandler; import org.apache.cassandra.sidecar.handlers.RingHandler; import org.apache.cassandra.sidecar.handlers.SchemaHandler; @@ -156,6 +157,26 @@ VertxRoute cassandraNodeDecommissionRoute(RouteBuilder.Factory factory, return factory.buildRouteWithHandler(nodeDecommissionHandler); } + @PUT + @Path(ApiEndpointsV1.NODE_MOVE_ROUTE) + @Operation(summary = "Move node to new token", + description = "Moves the Cassandra node to a new token in the ring") + @APIResponse(description = "Node move operation completed successfully", + responseCode = "200", + content = @Content(mediaType = "application/json", + schema = @Schema(implementation = OperationalJobResponse.class))) + @APIResponse(description = "Node move operation initiated successfully", + responseCode = "202", + content = @Content(mediaType = "application/json", + schema = @Schema(implementation = OperationalJobResponse.class))) + @ProvidesIntoMap + @KeyClassMapKey(VertxRouteMapKeys.CassandraNodeMoveRouteKey.class) + VertxRoute cassandraNodeMoveRoute(RouteBuilder.Factory factory, + NodeMoveHandler nodeMoveHandler) + { + return factory.buildRouteWithHandler(nodeMoveHandler); + } + @GET @Path(ApiEndpointsV1.STREAM_STATS_ROUTE) @Operation(summary = "Get stream statistics", diff --git a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java index e3bec846f..5a0b16451 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java @@ -83,6 +83,11 @@ interface CassandraNodeDecommissionRouteKey extends RouteClassKey HttpMethod HTTP_METHOD = HttpMethod.PUT; String ROUTE_URI = ApiEndpointsV1.NODE_DECOMMISSION_ROUTE; } + interface CassandraNodeMoveRouteKey extends RouteClassKey + { + HttpMethod HTTP_METHOD = HttpMethod.PUT; + String ROUTE_URI = ApiEndpointsV1.NODE_MOVE_ROUTE; + } interface CassandraNodeSettingsRouteKey extends RouteClassKey { HttpMethod HTTP_METHOD = HttpMethod.GET; diff --git a/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandlerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandlerTest.java new file mode 100644 index 000000000..2337429a1 --- /dev/null +++ b/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandlerTest.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers; + +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.google.inject.Singleton; +import com.google.inject.util.Modules; +import io.vertx.core.Vertx; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.predicate.ResponsePredicate; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import org.apache.cassandra.sidecar.TestModule; +import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; +import org.apache.cassandra.sidecar.cluster.InstancesMetadata; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.response.OperationalJobResponse; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.modules.SidecarModules; +import org.apache.cassandra.sidecar.server.Server; +import org.mockito.AdditionalAnswers; + +import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED; +import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; +import static io.netty.handler.codec.http.HttpResponseStatus.CONFLICT; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING; +import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link NodeMoveHandler} + */ +@ExtendWith(VertxExtension.class) +public class NodeMoveHandlerTest +{ + static final Logger LOGGER = LoggerFactory.getLogger(NodeMoveHandlerTest.class); + public static final String MOVE_ROUTE = "/api/v1/cassandra/operations/move"; + public static final String LOCAL_HOST = "127.0.0.1"; + public static final String OPERATION_MODE_MOVING = "MOVING"; + public static final String OPERATION_MODE_NORMAL = "NORMAL"; + Vertx vertx; + Server server; + StorageOperations mockStorageOperations = mock(StorageOperations.class); + + @BeforeEach + void before() throws InterruptedException + { + Injector injector; + Module testOverride = Modules.override(new TestModule()) + .with(new NodeMoveHandlerTest.NodeMoveTestModule()); + injector = Guice.createInjector(Modules.override(SidecarModules.all()) + .with(testOverride)); + vertx = injector.getInstance(Vertx.class); + server = injector.getInstance(Server.class); + VertxTestContext context = new VertxTestContext(); + server.start() + .onSuccess(s -> context.completeNow()) + .onFailure(context::failNow); + context.awaitCompletion(5, TimeUnit.SECONDS); + } + + @AfterEach + void after() throws InterruptedException + { + CountDownLatch closeLatch = new CountDownLatch(1); + server.close().onSuccess(res -> closeLatch.countDown()); + if (closeLatch.await(60, TimeUnit.SECONDS)) + LOGGER.info("Close event received before timeout."); + else + LOGGER.error("Close event timed out."); + } + + @Test + void testMoveLongRunning(VertxTestContext context) + { + when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL); + doAnswer(AdditionalAnswers.answersWithDelay(6000, invocation -> null)) + .when(mockStorageOperations).move(anyString()); + + WebClient client = WebClient.create(vertx); + String testRoute = MOVE_ROUTE + "?newToken=123456789"; + client.put(server.actualPort(), LOCAL_HOST, testRoute) + .expect(ResponsePredicate.SC_ACCEPTED) + .send(context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(ACCEPTED.code()); + OperationalJobResponse moveResponse = response.bodyAsJson(OperationalJobResponse.class); + assertThat(moveResponse).isNotNull(); + assertThat(moveResponse.status()).isEqualTo(RUNNING); + assertThat(moveResponse.operation()).isEqualTo("move"); + context.completeNow(); + })); + } + + @Test + void testMoveCompleted(VertxTestContext context) + { + when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL); + WebClient client = WebClient.create(vertx); + String testRoute = MOVE_ROUTE + "?newToken=123456789"; + client.put(server.actualPort(), LOCAL_HOST, testRoute) + .send(context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(OK.code()); + LOGGER.info("Move Response: {}", response.bodyAsString()); + + OperationalJobResponse moveResponse = response.bodyAsJson(OperationalJobResponse.class); + assertThat(moveResponse).isNotNull(); + assertThat(moveResponse.status()).isEqualTo(SUCCEEDED); + assertThat(moveResponse.operation()).isEqualTo("move"); + verify(mockStorageOperations).move("123456789"); + context.completeNow(); + })); + } + + @Test + void testMoveFailed(VertxTestContext context) + { + when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL); + doThrow(new RuntimeException("Simulated failure")).when(mockStorageOperations).move(anyString()); + WebClient client = WebClient.create(vertx); + String testRoute = MOVE_ROUTE + "?newToken=123456789"; + client.put(server.actualPort(), LOCAL_HOST, testRoute) + .expect(ResponsePredicate.SC_OK) + .send(context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(OK.code()); + context.completeNow(); + })); + } + + @Test + void testMoveConflictAlreadyMoving(VertxTestContext context) + { + when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_MOVING); + WebClient client = WebClient.create(vertx); + String testRoute = MOVE_ROUTE + "?newToken=123456789"; + client.put(server.actualPort(), LOCAL_HOST, testRoute) + .expect(ResponsePredicate.SC_CONFLICT) + .send(context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(CONFLICT.code()); + LOGGER.info("Move Response: {}", response.bodyAsString()); + OperationalJobResponse moveResponse = response.bodyAsJson(OperationalJobResponse.class); + assertThat(moveResponse).isNotNull(); + assertThat(moveResponse.jobId()).isNotNull(); + verify(mockStorageOperations, never()).move(anyString()); // Should not call move when already moving + context.completeNow(); + })); + } + + @Test + void testMoveWithMissingToken(VertxTestContext context) + { + WebClient client = WebClient.create(vertx); + client.put(server.actualPort(), LOCAL_HOST, MOVE_ROUTE) + .expect(ResponsePredicate.SC_BAD_REQUEST) + .send(context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(BAD_REQUEST.code()); + verify(mockStorageOperations, never()).move(anyString()); + context.completeNow(); + })); + } + + @Test + void testMoveWithEmptyToken(VertxTestContext context) + { + WebClient client = WebClient.create(vertx); + String testRoute = MOVE_ROUTE + "?newToken="; // Empty token parameter + client.put(server.actualPort(), LOCAL_HOST, testRoute) + .expect(ResponsePredicate.SC_BAD_REQUEST) + .send(context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(BAD_REQUEST.code()); + verify(mockStorageOperations, never()).move(anyString()); + context.completeNow(); + })); + } + + @Test + void testMoveWithInvalidToken(VertxTestContext context) + { + WebClient client = WebClient.create(vertx); + String testRoute = MOVE_ROUTE + "?newToken=invalidtoken"; // Invalid token parameter + client.put(server.actualPort(), LOCAL_HOST, testRoute) + .expect(ResponsePredicate.SC_BAD_REQUEST) + .send(context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(BAD_REQUEST.code()); + verify(mockStorageOperations, never()).move(anyString()); + context.completeNow(); + })); + } + + @Test + void testMoveWithNegativeToken(VertxTestContext context) + { + when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL); + WebClient client = WebClient.create(vertx); + String testRoute = MOVE_ROUTE + "?newToken=-9223372036854775808"; // Negative token (valid for Murmur3) + client.put(server.actualPort(), LOCAL_HOST, testRoute) + .send(context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(OK.code()); + OperationalJobResponse moveResponse = response.bodyAsJson(OperationalJobResponse.class); + assertThat(moveResponse).isNotNull(); + assertThat(moveResponse.status()).isEqualTo(SUCCEEDED); + verify(mockStorageOperations).move("-9223372036854775808"); + context.completeNow(); + })); + } + + @Test + void testMoveWithZeroToken(VertxTestContext context) + { + when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL); + WebClient client = WebClient.create(vertx); + String testRoute = MOVE_ROUTE + "?newToken=0"; // Zero token + client.put(server.actualPort(), LOCAL_HOST, testRoute) + .send(context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(OK.code()); + OperationalJobResponse moveResponse = response.bodyAsJson(OperationalJobResponse.class); + assertThat(moveResponse).isNotNull(); + assertThat(moveResponse.status()).isEqualTo(SUCCEEDED); + verify(mockStorageOperations).move("0"); + context.completeNow(); + })); + } + + /** + * Test guice module for Node Move handler tests + */ + class NodeMoveTestModule extends AbstractModule + { + @Provides + @Singleton + public InstancesMetadata instanceMetadata() + { + final int instanceId = 100; + final String host = LOCAL_HOST; + final InstanceMetadata instanceMetadata = mock(InstanceMetadata.class); + when(instanceMetadata.host()).thenReturn(host); + when(instanceMetadata.port()).thenReturn(9042); + when(instanceMetadata.id()).thenReturn(instanceId); + when(instanceMetadata.stagingDir()).thenReturn(""); + + CassandraAdapterDelegate delegate = mock(CassandraAdapterDelegate.class); + + when(delegate.storageOperations()).thenReturn(mockStorageOperations); + when(instanceMetadata.delegate()).thenReturn(delegate); + + InstancesMetadata mockInstancesMetadata = mock(InstancesMetadata.class); + when(mockInstancesMetadata.instances()).thenReturn(Collections.singletonList(instanceMetadata)); + when(mockInstancesMetadata.instanceFromId(instanceId)).thenReturn(instanceMetadata); + when(mockInstancesMetadata.instanceFromHost(host)).thenReturn(instanceMetadata); + + return mockInstancesMetadata; + } + } +} diff --git a/server/src/test/java/org/apache/cassandra/sidecar/job/NodeMoveJobTest.java b/server/src/test/java/org/apache/cassandra/sidecar/job/NodeMoveJobTest.java new file mode 100644 index 000000000..50ab789ba --- /dev/null +++ b/server/src/test/java/org/apache/cassandra/sidecar/job/NodeMoveJobTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.job; + +import java.util.UUID; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.datastax.driver.core.utils.UUIDs; +import io.vertx.core.Promise; +import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link NodeMoveJob} + */ +class NodeMoveJobTest +{ + public static final String OPERATION_MOVE = "move"; + public static final String OPERATION_MODE_MOVING = "MOVING"; + public static final String OPERATION_MODE_NORMAL = "NORMAL"; + public static final String OPERATION_MODE_JOINING = "JOINING"; + private StorageOperations mockStorageOperations; + private UUID jobId; + private String newToken; + + @BeforeEach + void setUp() + { + mockStorageOperations = mock(StorageOperations.class); + jobId = UUIDs.timeBased(); + newToken = "123456789"; + } + + @Test + void testJobName() + { + NodeMoveJob job = new NodeMoveJob(jobId, newToken, mockStorageOperations); + assertThat(job.name()).isEqualTo(OPERATION_MOVE); + } + + @Test + void testJobId() + { + NodeMoveJob job = new NodeMoveJob(jobId, newToken, mockStorageOperations); + assertThat(job.jobId()).isEqualTo(jobId); + } + + @Test + void testIsRunningOnCassandraWhenMoving() + { + when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_MOVING); + NodeMoveJob job = new NodeMoveJob(jobId, newToken, mockStorageOperations); + assertThat(job.isRunningOnCassandra()).isTrue(); + } + + @Test + void testIsRunningOnCassandraWhenNormal() + { + when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL); + NodeMoveJob job = new NodeMoveJob(jobId, newToken, mockStorageOperations); + assertThat(job.isRunningOnCassandra()).isFalse(); + } + + @Test + void testIsRunningOnCassandraWhenOtherMode() + { + when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_JOINING); + NodeMoveJob job = new NodeMoveJob(jobId, newToken, mockStorageOperations); + assertThat(job.isRunningOnCassandra()).isFalse(); + } + + @Test + void testStatusWhenNormal() + { + when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL); + NodeMoveJob job = new NodeMoveJob(jobId, newToken, mockStorageOperations); + assertThat(job.status()).isEqualTo(OperationalJobStatus.CREATED); + } + + @Test + void testStatusWhenFailed() + { + when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL); + RuntimeException testException = new RuntimeException("Test failure"); + doThrow(testException).when(mockStorageOperations).move(newToken); + + NodeMoveJob job = new NodeMoveJob(jobId, newToken, mockStorageOperations); + + Promise promise = Promise.promise(); + job.execute(promise); + + assertThat(promise.future().failed()).isTrue(); + assertThat(job.status()).isEqualTo(OperationalJobStatus.FAILED); + } + + @Test + void testExecuteInternalCallsMove() + { + when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL); + NodeMoveJob job = new NodeMoveJob(jobId, newToken, mockStorageOperations); + + Promise promise = Promise.promise(); + job.execute(promise); + + verify(mockStorageOperations).move(newToken); + assertThat(promise.future().succeeded()).isTrue(); + } + + @Test + void testExecuteInternalHandlesException() + { + when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL); + RuntimeException testException = new RuntimeException("Test exception"); + doThrow(testException).when(mockStorageOperations).move(newToken); + + NodeMoveJob job = new NodeMoveJob(jobId, newToken, mockStorageOperations); + + Promise promise = Promise.promise(); + job.execute(promise); + + verify(mockStorageOperations).move(newToken); + assertThat(promise.future().failed()).isTrue(); + assertThat(promise.future().cause()).isInstanceOf(OperationalJobException.class); + assertThat(promise.future().cause().getCause()).isEqualTo(testException); + assertThat(job.status()).isEqualTo(OperationalJobStatus.FAILED); + } + + @Test + void testJobWithNegativeToken() + { + String negativeToken = "-9223372036854775808"; + NodeMoveJob job = new NodeMoveJob(jobId, negativeToken, mockStorageOperations); + assertThat(job.name()).isEqualTo(OPERATION_MOVE); + assertThat(job.jobId()).isEqualTo(jobId); + } +} From 366ed95153fe68a1aec60c5dd490444c3cb08e53 Mon Sep 17 00:00:00 2001 From: Sudipta Laha Date: Tue, 4 Nov 2025 13:54:55 -0800 Subject: [PATCH 02/10] Updated node move API to accept newToken in request body --- .../common/request/NodeMoveRequest.java | 15 +++- .../request/data/NodeMoveRequestPayload.java | 79 ++++++++++++++++ .../data/NodeMoveRequestPayloadTest.java | 89 +++++++++++++++++++ .../sidecar/client/SidecarClientTest.java | 7 +- ...assandraNodeOperationsIntegrationTest.java | 24 ++--- .../sidecar/handlers/NodeMoveHandler.java | 33 +++---- .../modules/CassandraOperationsModule.java | 5 +- .../sidecar/handlers/NodeMoveHandlerTest.java | 60 +++++++------ 8 files changed, 258 insertions(+), 54 deletions(-) create mode 100644 client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/NodeMoveRequestPayload.java create mode 100644 client-common/src/test/java/org/apache/cassandra/sidecar/common/request/data/NodeMoveRequestPayloadTest.java diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/NodeMoveRequest.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/NodeMoveRequest.java index 1c9e11741..2ea8b3254 100644 --- a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/NodeMoveRequest.java +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/NodeMoveRequest.java @@ -20,6 +20,7 @@ import io.netty.handler.codec.http.HttpMethod; import org.apache.cassandra.sidecar.common.ApiEndpointsV1; +import org.apache.cassandra.sidecar.common.request.data.NodeMoveRequestPayload; import org.apache.cassandra.sidecar.common.response.OperationalJobResponse; /** @@ -27,6 +28,8 @@ */ public class NodeMoveRequest extends JsonRequest { + private final NodeMoveRequestPayload payload; + /** * Constructs a request to execute a node move operation * @@ -34,7 +37,8 @@ public class NodeMoveRequest extends JsonRequest */ public NodeMoveRequest(String newToken) { - super(ApiEndpointsV1.NODE_MOVE_ROUTE + "?newToken=" + newToken); + super(ApiEndpointsV1.NODE_MOVE_ROUTE); + this.payload = new NodeMoveRequestPayload(newToken); } /** @@ -45,4 +49,13 @@ public HttpMethod method() { return HttpMethod.PUT; } + + /** + * {@inheritDoc} + */ + @Override + public Object requestBody() + { + return payload; + } } diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/NodeMoveRequestPayload.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/NodeMoveRequestPayload.java new file mode 100644 index 000000000..f1f3cd3e6 --- /dev/null +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/NodeMoveRequestPayload.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.request.data; + +import java.math.BigInteger; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.cassandra.sidecar.common.utils.Preconditions; +import org.apache.cassandra.sidecar.common.utils.StringUtils; + +/** + * Request payload for node move operations. + * + *

Valid JSON:

+ *
+ *   { "newToken": "123456789" }
+ * 
+ */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class NodeMoveRequestPayload +{ + private final String newToken; + + /** + * @param newToken the new token for the node to move to + */ + @JsonCreator + public NodeMoveRequestPayload(@JsonProperty(value = "newToken", required = true) String newToken) + { + Preconditions.checkArgument(StringUtils.isNotEmpty(newToken), + "newToken must be provided and non-empty"); + + String trimmedToken = newToken.trim(); + try + { + new BigInteger(trimmedToken); + } + catch (NumberFormatException e) + { + throw new IllegalArgumentException( + String.format("newToken parameter must be a valid integer. Provided value=%s", newToken), e); + } + + this.newToken = trimmedToken; + } + + /** + * @return the new token for the node to move to + */ + @JsonProperty("newToken") + public String newToken() + { + return newToken; + } + + @Override + public String toString() + { + return "NodeMoveRequestPayload{newToken='" + newToken + "'}"; + } +} diff --git a/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/data/NodeMoveRequestPayloadTest.java b/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/data/NodeMoveRequestPayloadTest.java new file mode 100644 index 000000000..913a1112e --- /dev/null +++ b/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/data/NodeMoveRequestPayloadTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.request.data; + +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Tests for {@link NodeMoveRequestPayload} + */ +public class NodeMoveRequestPayloadTest +{ + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Test + void testValidTokens() + { + NodeMoveRequestPayload payload1 = new NodeMoveRequestPayload("123456789"); + assertThat(payload1.newToken()).isEqualTo("123456789"); + + NodeMoveRequestPayload payload2 = new NodeMoveRequestPayload("-9223372036854775808"); + assertThat(payload2.newToken()).isEqualTo("-9223372036854775808"); + + NodeMoveRequestPayload payload3 = new NodeMoveRequestPayload("0"); + assertThat(payload3.newToken()).isEqualTo("0"); + } + + @Test + void testInvalidTokens() + { + assertThatThrownBy(() -> new NodeMoveRequestPayload("invalid")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("newToken parameter must be a valid integer"); + + assertThatThrownBy(() -> new NodeMoveRequestPayload("")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("newToken must be provided and non-empty"); + + assertThatThrownBy(() -> new NodeMoveRequestPayload(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("newToken must be provided and non-empty"); + } + + @Test + void testTokenTrimming() + { + NodeMoveRequestPayload payload = new NodeMoveRequestPayload(" 123456789 "); + assertThat(payload.newToken()).isEqualTo("123456789"); + } + + @Test + void testJsonSerialization() throws JsonProcessingException + { + NodeMoveRequestPayload payload = new NodeMoveRequestPayload("123456789"); + String json = objectMapper.writeValueAsString(payload); + assertThat(json).contains("\"newToken\":\"123456789\""); + + NodeMoveRequestPayload deserialized = objectMapper.readValue(json, NodeMoveRequestPayload.class); + assertThat(deserialized.newToken()).isEqualTo("123456789"); + } + + @Test + void testToString() + { + NodeMoveRequestPayload payload = new NodeMoveRequestPayload("123456789"); + assertThat(payload.toString()).contains("123456789"); + } +} diff --git a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java index d38c8269c..ac0122bdf 100644 --- a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java +++ b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java @@ -1410,7 +1410,12 @@ public void testNodeMove() throws Exception OperationalJobResponse result = client.nodeMove(sidecarInstance, newToken).get(30, TimeUnit.SECONDS); assertThat(result).isNotNull(); assertThat(result.status()).isEqualTo(OperationalJobStatus.SUCCEEDED); - validateResponseServed(ApiEndpointsV1.NODE_MOVE_ROUTE + "?newToken=" + newToken); + validateResponseServed(ApiEndpointsV1.NODE_MOVE_ROUTE, request -> { + // Verify that the request body contains the expected JSON payload + String requestBody = request.getBody().readUtf8(); + assertThat(requestBody).contains("\"newToken\":\"" + newToken + "\""); + assertThat(request.getHeader("Content-Type")).isEqualTo("application/json"); + }); } @Test diff --git a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java index 806e6b508..b5aaa3120 100644 --- a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java +++ b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java @@ -60,11 +60,13 @@ void testNodeMoveOperationSuccess() { // Use a test token - this is a valid token for Murmur3Partitioner String testToken = "123456789"; + String requestBody = "{\"newToken\":\"" + testToken + "\"}"; // Initiate move operation HttpResponse moveResponse = getBlocking( - trustedClient().put(serverWrapper.serverPort, "localhost", ApiEndpointsV1.NODE_MOVE_ROUTE + "?newToken=" + testToken) - .send()); + trustedClient().put(serverWrapper.serverPort, "localhost", ApiEndpointsV1.NODE_MOVE_ROUTE) + .putHeader("content-type", "application/json") + .sendBuffer(Buffer.buffer(requestBody))); assertThat(moveResponse.statusCode()).isIn(OK.code(), ACCEPTED.code()); @@ -91,7 +93,7 @@ void testNodeMoveOperationSuccess() // The operationMode should be either NORMAL (completed) or MOVING (in progress) assertThat(streamStats.getString("operationMode")).isIn("NORMAL", "MOVING"); }); - + // Validate the operational job status using the OperationalJobHandler String jobId = responseBody.getString("jobId"); validateOperationalJobStatus(jobId, "move"); @@ -101,19 +103,19 @@ void testNodeMoveOperationSuccess() * Validates the operational job status by querying the OperationalJobHandler endpoint * and waiting for the job to reach a final state if necessary. * - * @param jobId the ID of the operational job to validate + * @param jobId the ID of the operational job to validate * @param expectedOperation the expected operation name (e.g., "move", "decommission", "drain") */ private void validateOperationalJobStatus(String jobId, String expectedOperation) { String operationalJobRoute = ApiEndpointsV1.OPERATIONAL_JOB_ROUTE.replace(":operationId", jobId); - + HttpResponse jobStatusResponse = getBlocking( trustedClient().get(serverWrapper.serverPort, "localhost", operationalJobRoute) .send()); - + assertThat(jobStatusResponse.statusCode()).isEqualTo(OK.code()); - + JsonObject jobStatusBody = jobStatusResponse.bodyAsJsonObject(); assertThat(jobStatusBody).isNotNull(); assertThat(jobStatusBody.getString("jobId")).isEqualTo(jobId); @@ -122,7 +124,7 @@ private void validateOperationalJobStatus(String jobId, String expectedOperation OperationalJobStatus.RUNNING.name(), OperationalJobStatus.SUCCEEDED.name() ); - + // If the job is still running, wait for it to complete or reach a final state if (OperationalJobStatus.RUNNING.name().equals(jobStatusBody.getString("jobStatus"))) { @@ -130,9 +132,9 @@ private void validateOperationalJobStatus(String jobId, String expectedOperation HttpResponse finalJobStatusResponse = getBlocking( trustedClient().get(serverWrapper.serverPort, "localhost", operationalJobRoute) .send()); - + assertThat(finalJobStatusResponse.statusCode()).isEqualTo(OK.code()); - + JsonObject finalJobStatusBody = finalJobStatusResponse.bodyAsJsonObject(); assertThat(finalJobStatusBody).isNotNull(); assertThat(finalJobStatusBody.getString("jobStatus")).isIn( @@ -158,4 +160,4 @@ protected void tearDown() throws Exception logger.error("Exception in tear down", ex); } } -} \ No newline at end of file +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java index 975e96b04..d81461975 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java @@ -18,17 +18,20 @@ package org.apache.cassandra.sidecar.handlers; -import java.math.BigInteger; import java.util.Collections; import java.util.Set; import com.datastax.driver.core.utils.UUIDs; import com.google.inject.Inject; +import io.netty.handler.codec.http.HttpResponseStatus; import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.json.DecodeException; +import io.vertx.core.json.Json; import io.vertx.core.net.SocketAddress; import io.vertx.ext.auth.authorization.Authorization; import io.vertx.ext.web.RoutingContext; import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions; +import org.apache.cassandra.sidecar.common.request.data.NodeMoveRequestPayload; import org.apache.cassandra.sidecar.common.server.StorageOperations; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; import org.apache.cassandra.sidecar.config.ServiceConfiguration; @@ -38,10 +41,12 @@ import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; import org.jetbrains.annotations.NotNull; +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + /** * Provides REST API for asynchronously moving the corresponding Cassandra node to a new token */ -public class NodeMoveHandler extends AbstractHandler implements AccessProtected +public class NodeMoveHandler extends AbstractHandler implements AccessProtected { private final OperationalJobManager jobManager; private final ServiceConfiguration config; @@ -79,10 +84,10 @@ public void handleInternal(RoutingContext context, HttpServerRequest httpRequest, @NotNull String host, SocketAddress remoteAddress, - String newToken) + NodeMoveRequestPayload requestPayload) { StorageOperations operations = metadataFetcher.delegate(host).storageOperations(); - NodeMoveJob job = new NodeMoveJob(UUIDs.timeBased(), newToken, operations); + NodeMoveJob job = new NodeMoveJob(UUIDs.timeBased(), requestPayload.newToken(), operations); handleOperationalJob(jobManager, config, context, job); } @@ -90,24 +95,22 @@ public void handleInternal(RoutingContext context, * {@inheritDoc} */ @Override - protected String extractParamsOrThrow(RoutingContext context) + protected NodeMoveRequestPayload extractParamsOrThrow(RoutingContext context) { - String newToken = context.request().getParam("newToken"); - if (newToken == null || newToken.isBlank()) + String body = context.body().asString(); + if (body == null || body.equalsIgnoreCase("null")) { - throw new IllegalArgumentException("newToken parameter is required"); + logger.warn("Bad request. Received null payload."); + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Request body must be JSON with a non-null \"newToken\" field"); } - - String trimmedToken = newToken.trim(); try { - new BigInteger(trimmedToken); + return Json.decodeValue(body, NodeMoveRequestPayload.class); } - catch (NumberFormatException e) + catch (DecodeException e) { - throw new IllegalArgumentException( - String.format("newToken parameter must be a valid integer. Provided value=%s", newToken), e); + logger.warn("Bad request. Received invalid JSON payload."); + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Invalid newToken value: " + e.getMessage()); } - return trimmedToken; } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java b/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java index d7a183469..a882f96af 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java @@ -174,7 +174,10 @@ VertxRoute cassandraNodeDecommissionRoute(RouteBuilder.Factory factory, VertxRoute cassandraNodeMoveRoute(RouteBuilder.Factory factory, NodeMoveHandler nodeMoveHandler) { - return factory.buildRouteWithHandler(nodeMoveHandler); + return factory.builderForRoute() + .setBodyHandler(true) + .handler(nodeMoveHandler) + .build(); } @GET diff --git a/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandlerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandlerTest.java index 2337429a1..4e0170eff 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandlerTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandlerTest.java @@ -117,10 +117,11 @@ void testMoveLongRunning(VertxTestContext context) .when(mockStorageOperations).move(anyString()); WebClient client = WebClient.create(vertx); - String testRoute = MOVE_ROUTE + "?newToken=123456789"; - client.put(server.actualPort(), LOCAL_HOST, testRoute) + String requestBody = "{\"newToken\":\"123456789\"}"; + client.put(server.actualPort(), LOCAL_HOST, MOVE_ROUTE) .expect(ResponsePredicate.SC_ACCEPTED) - .send(context.succeeding(response -> { + .putHeader("content-type", "application/json") + .sendBuffer(io.vertx.core.buffer.Buffer.buffer(requestBody), context.succeeding(response -> { assertThat(response.statusCode()).isEqualTo(ACCEPTED.code()); OperationalJobResponse moveResponse = response.bodyAsJson(OperationalJobResponse.class); assertThat(moveResponse).isNotNull(); @@ -135,9 +136,10 @@ void testMoveCompleted(VertxTestContext context) { when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL); WebClient client = WebClient.create(vertx); - String testRoute = MOVE_ROUTE + "?newToken=123456789"; - client.put(server.actualPort(), LOCAL_HOST, testRoute) - .send(context.succeeding(response -> { + String requestBody = "{\"newToken\":\"123456789\"}"; + client.put(server.actualPort(), LOCAL_HOST, MOVE_ROUTE) + .putHeader("content-type", "application/json") + .sendBuffer(io.vertx.core.buffer.Buffer.buffer(requestBody), context.succeeding(response -> { assertThat(response.statusCode()).isEqualTo(OK.code()); LOGGER.info("Move Response: {}", response.bodyAsString()); @@ -156,10 +158,11 @@ void testMoveFailed(VertxTestContext context) when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL); doThrow(new RuntimeException("Simulated failure")).when(mockStorageOperations).move(anyString()); WebClient client = WebClient.create(vertx); - String testRoute = MOVE_ROUTE + "?newToken=123456789"; - client.put(server.actualPort(), LOCAL_HOST, testRoute) + String requestBody = "{\"newToken\":\"123456789\"}"; + client.put(server.actualPort(), LOCAL_HOST, MOVE_ROUTE) .expect(ResponsePredicate.SC_OK) - .send(context.succeeding(response -> { + .putHeader("content-type", "application/json") + .sendBuffer(io.vertx.core.buffer.Buffer.buffer(requestBody), context.succeeding(response -> { assertThat(response.statusCode()).isEqualTo(OK.code()); context.completeNow(); })); @@ -170,10 +173,11 @@ void testMoveConflictAlreadyMoving(VertxTestContext context) { when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_MOVING); WebClient client = WebClient.create(vertx); - String testRoute = MOVE_ROUTE + "?newToken=123456789"; - client.put(server.actualPort(), LOCAL_HOST, testRoute) + String requestBody = "{\"newToken\":\"123456789\"}"; + client.put(server.actualPort(), LOCAL_HOST, MOVE_ROUTE) .expect(ResponsePredicate.SC_CONFLICT) - .send(context.succeeding(response -> { + .putHeader("content-type", "application/json") + .sendBuffer(io.vertx.core.buffer.Buffer.buffer(requestBody), context.succeeding(response -> { assertThat(response.statusCode()).isEqualTo(CONFLICT.code()); LOGGER.info("Move Response: {}", response.bodyAsString()); OperationalJobResponse moveResponse = response.bodyAsJson(OperationalJobResponse.class); @@ -188,9 +192,11 @@ void testMoveConflictAlreadyMoving(VertxTestContext context) void testMoveWithMissingToken(VertxTestContext context) { WebClient client = WebClient.create(vertx); + String requestBody = "{}"; // Empty JSON body client.put(server.actualPort(), LOCAL_HOST, MOVE_ROUTE) .expect(ResponsePredicate.SC_BAD_REQUEST) - .send(context.succeeding(response -> { + .putHeader("content-type", "application/json") + .sendBuffer(io.vertx.core.buffer.Buffer.buffer(requestBody), context.succeeding(response -> { assertThat(response.statusCode()).isEqualTo(BAD_REQUEST.code()); verify(mockStorageOperations, never()).move(anyString()); context.completeNow(); @@ -201,10 +207,11 @@ void testMoveWithMissingToken(VertxTestContext context) void testMoveWithEmptyToken(VertxTestContext context) { WebClient client = WebClient.create(vertx); - String testRoute = MOVE_ROUTE + "?newToken="; // Empty token parameter - client.put(server.actualPort(), LOCAL_HOST, testRoute) + String requestBody = "{\"newToken\":\"\"}"; // Empty token in JSON + client.put(server.actualPort(), LOCAL_HOST, MOVE_ROUTE) .expect(ResponsePredicate.SC_BAD_REQUEST) - .send(context.succeeding(response -> { + .putHeader("content-type", "application/json") + .sendBuffer(io.vertx.core.buffer.Buffer.buffer(requestBody), context.succeeding(response -> { assertThat(response.statusCode()).isEqualTo(BAD_REQUEST.code()); verify(mockStorageOperations, never()).move(anyString()); context.completeNow(); @@ -215,10 +222,11 @@ void testMoveWithEmptyToken(VertxTestContext context) void testMoveWithInvalidToken(VertxTestContext context) { WebClient client = WebClient.create(vertx); - String testRoute = MOVE_ROUTE + "?newToken=invalidtoken"; // Invalid token parameter - client.put(server.actualPort(), LOCAL_HOST, testRoute) + String requestBody = "{\"newToken\":\"invalidtoken\"}"; // Invalid token in JSON + client.put(server.actualPort(), LOCAL_HOST, MOVE_ROUTE) .expect(ResponsePredicate.SC_BAD_REQUEST) - .send(context.succeeding(response -> { + .putHeader("content-type", "application/json") + .sendBuffer(io.vertx.core.buffer.Buffer.buffer(requestBody), context.succeeding(response -> { assertThat(response.statusCode()).isEqualTo(BAD_REQUEST.code()); verify(mockStorageOperations, never()).move(anyString()); context.completeNow(); @@ -230,9 +238,10 @@ void testMoveWithNegativeToken(VertxTestContext context) { when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL); WebClient client = WebClient.create(vertx); - String testRoute = MOVE_ROUTE + "?newToken=-9223372036854775808"; // Negative token (valid for Murmur3) - client.put(server.actualPort(), LOCAL_HOST, testRoute) - .send(context.succeeding(response -> { + String requestBody = "{\"newToken\":\"-9223372036854775808\"}"; // Negative token (valid for Murmur3) + client.put(server.actualPort(), LOCAL_HOST, MOVE_ROUTE) + .putHeader("content-type", "application/json") + .sendBuffer(io.vertx.core.buffer.Buffer.buffer(requestBody), context.succeeding(response -> { assertThat(response.statusCode()).isEqualTo(OK.code()); OperationalJobResponse moveResponse = response.bodyAsJson(OperationalJobResponse.class); assertThat(moveResponse).isNotNull(); @@ -247,9 +256,10 @@ void testMoveWithZeroToken(VertxTestContext context) { when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL); WebClient client = WebClient.create(vertx); - String testRoute = MOVE_ROUTE + "?newToken=0"; // Zero token - client.put(server.actualPort(), LOCAL_HOST, testRoute) - .send(context.succeeding(response -> { + String requestBody = "{\"newToken\":\"0\"}"; // Zero token + client.put(server.actualPort(), LOCAL_HOST, MOVE_ROUTE) + .putHeader("content-type", "application/json") + .sendBuffer(io.vertx.core.buffer.Buffer.buffer(requestBody), context.succeeding(response -> { assertThat(response.statusCode()).isEqualTo(OK.code()); OperationalJobResponse moveResponse = response.bodyAsJson(OperationalJobResponse.class); assertThat(moveResponse).isNotNull(); From 77501a2786ce44c9fe79a17c9cecfa1463c85bbc Mon Sep 17 00:00:00 2001 From: Sudipta Laha Date: Tue, 4 Nov 2025 16:49:37 -0800 Subject: [PATCH 03/10] Fixed merge conflicts --- .../sidecar/client/RequestContext.java | 1 + .../sidecar/client/SidecarClientTest.java | 35 +++++++++++++++++++ .../sidecar/handlers/AbstractHandler.java | 35 ------------------- .../sidecar/handlers/NodeMoveHandler.java | 7 +++- .../modules/CassandraOperationsModule.java | 2 +- .../multibindings/VertxRouteMapKeys.java | 15 ++++++++ 6 files changed, 58 insertions(+), 37 deletions(-) diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java index 3575591ea..df94de6ad 100644 --- a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java +++ b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java @@ -91,6 +91,7 @@ public class RequestContext protected static final GossipInfoRequest GOSSIP_INFO_REQUEST = new GossipInfoRequest(); protected static final ListOperationalJobsRequest LIST_JOBS_REQUEST = new ListOperationalJobsRequest(); protected static final NodeDecommissionRequest NODE_DECOMMISSION_REQUEST = new NodeDecommissionRequest(); + protected static final NodeDrainRequest NODE_DRAIN_REQUEST = new NodeDrainRequest(); protected static final StreamStatsRequest STREAM_STATS_REQUEST = new StreamStatsRequest(); protected static final LifecycleInfoRequest LIFECYCLE_INFO_REQUEST = new LifecycleInfoRequest(); diff --git a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java index 3c36d90b1..4490823d3 100644 --- a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java +++ b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java @@ -2059,6 +2059,41 @@ void testNodeUpdateLifecycle() throws Exception }); } + @Test + void testLiveMigrationStatusRequest() throws InterruptedException, ExecutionException + { + MockResponse response = new MockResponse(); + response.setResponseCode(200); + response.setBody("{\"state\":\"COMPLETED\",\"endTime\":1}"); + enqueue(response); + + SidecarInstance instance = instances.get(0); + + CompletableFuture result = client.liveMigrationStatus(instance); + + LiveMigrationStatus liveMigrationStatus = result.get(); + + assertThat(result.isCompletedExceptionally()).isFalse(); + assertThat(liveMigrationStatus).isEqualTo(new LiveMigrationStatus(MigrationState.COMPLETED, 1L)); + validateResponseServed(LIVE_MIGRATION_STATUS_ROUTE); + } + + @Test + void testLiveMigrationStatusRouteReturnedBadRequest() throws InterruptedException, ExecutionException + { + MockResponse response = new MockResponse(); + response.setResponseCode(400); + enqueue(response); + + SidecarInstance instance = instances.get(0); + + CompletableFuture result = client.liveMigrationStatus(instance); + + assertThatExceptionOfType(ExecutionException.class).isThrownBy(result::get); + assertThat(result.isCompletedExceptionally()).isTrue(); + validateResponseServed(LIVE_MIGRATION_STATUS_ROUTE); + } + private void enqueue(MockResponse response) { for (MockWebServer server : servers) diff --git a/server/src/main/java/org/apache/cassandra/sidecar/handlers/AbstractHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/handlers/AbstractHandler.java index 72cc0a955..a7c4f7f69 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/handlers/AbstractHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/handlers/AbstractHandler.java @@ -29,21 +29,14 @@ import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.handler.HttpException; import org.apache.cassandra.sidecar.adapters.base.exception.OperationUnavailableException; -import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; -import org.apache.cassandra.sidecar.common.response.OperationalJobResponse; import org.apache.cassandra.sidecar.common.server.data.Name; import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName; import org.apache.cassandra.sidecar.common.server.exceptions.JmxAuthenticationException; import org.apache.cassandra.sidecar.common.utils.Preconditions; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; -import org.apache.cassandra.sidecar.config.ServiceConfiguration; import org.apache.cassandra.sidecar.exceptions.NoSuchCassandraInstanceException; -import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException; -import org.apache.cassandra.sidecar.job.OperationalJob; -import org.apache.cassandra.sidecar.job.OperationalJobManager; import org.apache.cassandra.sidecar.utils.CassandraInputValidator; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; -import org.apache.cassandra.sidecar.utils.OperationalJobUtils; import org.jetbrains.annotations.NotNull; import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; @@ -324,32 +317,4 @@ public static String extractHostAddressWithoutPort(HttpServerRequest request) th } return host; } - - /** - * Handles the submission and execution of an operational job. - * - * @param jobManager the manager responsible for submitting and tracking operational jobs - * @param config the service configuration containing execution parameters - * @param context the routing context for the HTTP request/response - * @param job the operational job to be executed - */ - protected void handleOperationalJob(OperationalJobManager jobManager, ServiceConfiguration config, RoutingContext context, OperationalJob job) - { - try - { - jobManager.trySubmitJob(job); - } - catch (OperationalJobConflictException oje) - { - String reason = oje.getMessage(); - logger.error("Conflicting job encountered. reason={}", reason); - context.response().setStatusCode(HttpResponseStatus.CONFLICT.code()); - context.json(new OperationalJobResponse(job.jobId(), OperationalJobStatus.FAILED, job.name(), reason)); - return; - } - - // Get the result, waiting for the specified wait time for result - job.asyncResult(executorPools.service(), config.operationalJobExecutionMaxWaitTime()) - .onComplete(v -> OperationalJobUtils.sendStatusBasedResponse(context, job)); - } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java index d81461975..4fcb0be51 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java @@ -39,6 +39,7 @@ import org.apache.cassandra.sidecar.job.OperationalJobManager; import org.apache.cassandra.sidecar.utils.CassandraInputValidator; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.apache.cassandra.sidecar.utils.OperationalJobUtils; import org.jetbrains.annotations.NotNull; import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; @@ -88,7 +89,11 @@ public void handleInternal(RoutingContext context, { StorageOperations operations = metadataFetcher.delegate(host).storageOperations(); NodeMoveJob job = new NodeMoveJob(UUIDs.timeBased(), requestPayload.newToken(), operations); - handleOperationalJob(jobManager, config, context, job); + this.jobManager.trySubmitJob(job, + (completedJob, exception) -> + OperationalJobUtils.sendStatusBasedResponse(context, completedJob, exception), + executorPools.service(), + config.operationalJobExecutionMaxWaitTime()); } /** diff --git a/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java b/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java index c85b20c09..572c897a2 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java @@ -45,8 +45,8 @@ import org.apache.cassandra.sidecar.handlers.ListOperationalJobsHandler; import org.apache.cassandra.sidecar.handlers.NativeUpdateHandler; import org.apache.cassandra.sidecar.handlers.NodeDecommissionHandler; -import org.apache.cassandra.sidecar.handlers.NodeMoveHandler; import org.apache.cassandra.sidecar.handlers.NodeDrainHandler; +import org.apache.cassandra.sidecar.handlers.NodeMoveHandler; import org.apache.cassandra.sidecar.handlers.OperationalJobHandler; import org.apache.cassandra.sidecar.handlers.RingHandler; import org.apache.cassandra.sidecar.handlers.SchemaHandler; diff --git a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java index 1736c3e4e..23a064538 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java @@ -263,6 +263,21 @@ interface LiveMigrationListInstanceFilesRouteKey extends RouteClassKey HttpMethod HTTP_METHOD = HttpMethod.GET; String ROUTE_URI = ApiEndpointsV1.LIVE_MIGRATION_FILES_ROUTE; } + interface LiveMigrationStatusRouteKey extends RouteClassKey + { + HttpMethod HTTP_METHOD = HttpMethod.GET; + String ROUTE_URI = ApiEndpointsV1.LIVE_MIGRATION_STATUS_ROUTE; + } + public interface LiveMigrationStatusUpdateRouteKey extends RouteClassKey + { + HttpMethod HTTP_METHOD = HttpMethod.POST; + String ROUTE_URI = ApiEndpointsV1.LIVE_MIGRATION_STATUS_ROUTE; + } + public interface LiveMigrationStatusDeleteRouteKey extends RouteClassKey + { + HttpMethod HTTP_METHOD = HttpMethod.DELETE; + String ROUTE_URI = ApiEndpointsV1.LIVE_MIGRATION_STATUS_ROUTE; + } interface SSTableCleanupRouteKey extends RouteClassKey { HttpMethod HTTP_METHOD = HttpMethod.DELETE; From 881126be82e287ef3bfef90e51f36cffeb822721 Mon Sep 17 00:00:00 2001 From: Sudipta Laha Date: Tue, 4 Nov 2025 23:58:17 -0800 Subject: [PATCH 04/10] Addressed review comments --- .../adapters/base/utils/DataTypeUtils.java | 40 +++++- .../base/utils/DataTypeUtilsTest.java | 56 ++++++++ .../request/data/NodeMoveRequestPayload.java | 20 +-- .../data/NodeMoveRequestPayloadTest.java | 37 ----- ...assandraNodeOperationsIntegrationTest.java | 133 ++++++++++++++++-- .../sidecar/handlers/NodeMoveHandler.java | 22 ++- 6 files changed, 231 insertions(+), 77 deletions(-) diff --git a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/utils/DataTypeUtils.java b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/utils/DataTypeUtils.java index e5dac604d..77434ae92 100644 --- a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/utils/DataTypeUtils.java +++ b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/utils/DataTypeUtils.java @@ -18,6 +18,8 @@ package org.apache.cassandra.sidecar.adapters.base.utils; +import java.math.BigInteger; + /** * Utility class for data type conversions. */ @@ -62,20 +64,20 @@ else if (value instanceof Long) * This method performs a runtime type check before casting to prevent ClassCastException * and provides meaningful error messages when the cast fails. * - * @param value the object to be cast - * @param expectedType the expected type to cast to + * @param value the object to be cast + * @param expectedType the expected type to cast to * @param contextDescription descriptive context for error messages (e.g., "keyspace name", "table data") - * @param the target type + * @param the target type * @return the cast object of type T * @throws IllegalStateException if the value is not an instance of the expected type, - * with a descriptive message indicating what was expected vs what was received + * with a descriptive message indicating what was expected vs what was received */ public static T safeCast(Object value, Class expectedType, String contextDescription) { if (!expectedType.isInstance(value)) { throw new ClassCastException("Expected " + expectedType.getSimpleName() + " for " + contextDescription + " but got: " + - (value == null ? "null" : value.getClass().getSimpleName())); + (value == null ? "null" : value.getClass().getSimpleName())); } return expectedType.cast(value); } @@ -84,11 +86,11 @@ public static T safeCast(Object value, Class expectedType, String context * Safely parses a string to a long with descriptive error handling. * This method handles null values and provides meaningful error messages when parsing fails. * - * @param value the string value to be parsed + * @param value the string value to be parsed * @param contextDescription descriptive context for error messages (e.g., "completed bytes", "total bytes") * @return the parsed long value * @throws IllegalStateException if the value cannot be parsed as a long, - * with a descriptive message indicating what failed to parse + * with a descriptive message indicating what failed to parse */ public static long safeParseLong(String value, String contextDescription) { @@ -119,4 +121,28 @@ public static long mebibytesToBytes(long mebibytes) { return mebibytes << 20; } + + /** + * Validates whether a string can be parsed as a valid BigInteger. + * This method trims whitespace from the input string before validation. + * + * @param newToken the string to validate as a BigInteger, can be null + * @return true if the string can be successfully parsed as a BigInteger, false if null or invalid + */ + public static boolean isValidBigInt(String newToken) + { + if (newToken == null) + { + return false; + } + try + { + new BigInteger(newToken.trim()); + return true; + } + catch (NumberFormatException e) + { + return false; + } + } } diff --git a/adapters/adapters-base/src/test/java/org/apache/cassandra/sidecar/adapters/base/utils/DataTypeUtilsTest.java b/adapters/adapters-base/src/test/java/org/apache/cassandra/sidecar/adapters/base/utils/DataTypeUtilsTest.java index d0b83dec4..069eba854 100644 --- a/adapters/adapters-base/src/test/java/org/apache/cassandra/sidecar/adapters/base/utils/DataTypeUtilsTest.java +++ b/adapters/adapters-base/src/test/java/org/apache/cassandra/sidecar/adapters/base/utils/DataTypeUtilsTest.java @@ -144,4 +144,60 @@ void testMebibytesToBytes(String testCase, long megabytes, long expectedBytes) long result = DataTypeUtils.mebibytesToBytes(megabytes); assertEquals(expectedBytes, result); } + + static Stream isValidBigIntTestCases() + { + return Stream.of( + // Test null input - should return false + Arguments.of("null input", null, false), + // Test valid positive integer + Arguments.of("valid positive integer", "123", true), + // Test valid negative integer + Arguments.of("valid negative integer", "-456", true), + // Test zero + Arguments.of("valid zero", "0", true), + // Test very large positive number (beyond Long.MAX_VALUE) + Arguments.of("valid large positive number", "12345678901234567890123456789", true), + // Test very large negative number (beyond Long.MIN_VALUE) + Arguments.of("valid large negative number", "-98765432109876543210987654321", true), + // Test number with leading whitespace - should be valid after trim + Arguments.of("valid with leading whitespace", " 789", true), + // Test number with trailing whitespace - should be valid after trim + Arguments.of("valid with trailing whitespace", "321 ", true), + // Test number with leading and trailing whitespace - should be valid after trim + Arguments.of("valid with leading and trailing whitespace", " 999 ", true), + // Test number with just whitespace and sign + Arguments.of("valid negative with whitespace", " -555 ", true), + // Test empty string - should be invalid + Arguments.of("invalid empty string", "", false), + // Test string with only whitespace - should be invalid + Arguments.of("invalid whitespace only", " ", false), + // Test string with alphabetic characters + Arguments.of("invalid alphabetic", "abc", false), + // Test string with mixed alphanumeric + Arguments.of("invalid alphanumeric", "123abc", false), + // Test string with special characters + Arguments.of("invalid special characters", "123!@#", false), + // Test string with decimal point + Arguments.of("invalid decimal", "123.45", false), + // Test string with scientific notation + Arguments.of("invalid scientific notation", "1e5", false), + // Test string with multiple signs + Arguments.of("invalid multiple signs", "--123", false), + // Test string with sign in middle + Arguments.of("invalid sign in middle", "12-3", false), + // Test string with plus sign (valid) + Arguments.of("valid with plus sign", "+123", true), + // Test string with hex prefix + Arguments.of("invalid hex prefix", "0x123", false) + ); + } + + @ParameterizedTest + @MethodSource("isValidBigIntTestCases") + void testIsValidBigInt(String testCase, String input, boolean expectedResult) + { + boolean result = DataTypeUtils.isValidBigInt(input); + assertEquals(expectedResult, result); + } } diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/NodeMoveRequestPayload.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/NodeMoveRequestPayload.java index f1f3cd3e6..6e7990a5c 100644 --- a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/NodeMoveRequestPayload.java +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/NodeMoveRequestPayload.java @@ -18,13 +18,9 @@ package org.apache.cassandra.sidecar.common.request.data; -import java.math.BigInteger; - import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.cassandra.sidecar.common.utils.Preconditions; -import org.apache.cassandra.sidecar.common.utils.StringUtils; /** * Request payload for node move operations. @@ -45,21 +41,7 @@ public class NodeMoveRequestPayload @JsonCreator public NodeMoveRequestPayload(@JsonProperty(value = "newToken", required = true) String newToken) { - Preconditions.checkArgument(StringUtils.isNotEmpty(newToken), - "newToken must be provided and non-empty"); - - String trimmedToken = newToken.trim(); - try - { - new BigInteger(trimmedToken); - } - catch (NumberFormatException e) - { - throw new IllegalArgumentException( - String.format("newToken parameter must be a valid integer. Provided value=%s", newToken), e); - } - - this.newToken = trimmedToken; + this.newToken = newToken; } /** diff --git a/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/data/NodeMoveRequestPayloadTest.java b/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/data/NodeMoveRequestPayloadTest.java index 913a1112e..8cfa07d3d 100644 --- a/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/data/NodeMoveRequestPayloadTest.java +++ b/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/data/NodeMoveRequestPayloadTest.java @@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; /** * Tests for {@link NodeMoveRequestPayload} @@ -33,42 +32,6 @@ public class NodeMoveRequestPayloadTest { private final ObjectMapper objectMapper = new ObjectMapper(); - @Test - void testValidTokens() - { - NodeMoveRequestPayload payload1 = new NodeMoveRequestPayload("123456789"); - assertThat(payload1.newToken()).isEqualTo("123456789"); - - NodeMoveRequestPayload payload2 = new NodeMoveRequestPayload("-9223372036854775808"); - assertThat(payload2.newToken()).isEqualTo("-9223372036854775808"); - - NodeMoveRequestPayload payload3 = new NodeMoveRequestPayload("0"); - assertThat(payload3.newToken()).isEqualTo("0"); - } - - @Test - void testInvalidTokens() - { - assertThatThrownBy(() -> new NodeMoveRequestPayload("invalid")) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("newToken parameter must be a valid integer"); - - assertThatThrownBy(() -> new NodeMoveRequestPayload("")) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("newToken must be provided and non-empty"); - - assertThatThrownBy(() -> new NodeMoveRequestPayload(null)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("newToken must be provided and non-empty"); - } - - @Test - void testTokenTrimming() - { - NodeMoveRequestPayload payload = new NodeMoveRequestPayload(" 123456789 "); - assertThat(payload.newToken()).isEqualTo("123456789"); - } - @Test void testJsonSerialization() throws JsonProcessingException { diff --git a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java index bb1ad020d..a36257bbc 100644 --- a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java +++ b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java @@ -27,7 +27,10 @@ import io.vertx.ext.web.client.HttpResponse; import org.apache.cassandra.sidecar.common.ApiEndpointsV1; import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; +import org.apache.cassandra.sidecar.common.response.RingResponse; +import org.apache.cassandra.sidecar.common.response.data.RingEntry; import org.apache.cassandra.sidecar.testing.SharedClusterSidecarIntegrationTestBase; +import org.apache.cassandra.testing.ClusterBuilderConfiguration; import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED; import static io.netty.handler.codec.http.HttpResponseStatus.OK; @@ -42,6 +45,14 @@ public class CassandraNodeOperationsIntegrationTest extends SharedClusterSidecar { public static final String CASSANDRA_VERSION_4_0 = "4.0"; + @Override + protected ClusterBuilderConfiguration testClusterConfiguration() + { + return super.testClusterConfiguration() + .dcCount(1) + .nodesPerDc(3); + } + @Override protected void initializeSchemaForTest() { @@ -89,7 +100,7 @@ void testNodeDrainOperationSuccess() // Validate the operational job status using the OperationalJobHandler String jobId = responseBody.getString("jobId"); - validateOperationalJobStatus(jobId, "drain"); + validateOperationalJobStatus(jobId, "drain", OperationalJobStatus.SUCCEEDED); } @@ -100,6 +111,10 @@ void testNodeMoveOperationSuccess() String testToken = "123456789"; String requestBody = "{\"newToken\":\"" + testToken + "\"}"; + // Validate that the node owns a different token than testToken + String currentToken = getCurrentTokenForNode("localhost"); + assertThat(currentToken).isNotEqualTo(testToken); + // Initiate move operation HttpResponse moveResponse = getBlocking( trustedClient().put(serverWrapper.serverPort, "localhost", ApiEndpointsV1.NODE_MOVE_ROUTE) @@ -134,17 +149,111 @@ void testNodeMoveOperationSuccess() // Validate the operational job status using the OperationalJobHandler String jobId = responseBody.getString("jobId"); - validateOperationalJobStatus(jobId, "move"); + validateOperationalJobStatus(jobId, "move", OperationalJobStatus.SUCCEEDED); + + // Validate that the node actually owns the new token + currentToken = getCurrentTokenForNode("localhost"); + assertThat(currentToken).isEqualTo(testToken); + } + + /** + * Tests the failure case of node move operation when attempting to move to a token + * already owned by another node in the cluster. + *

+ * This test validates that: + * - The system properly rejects invalid move operations that would create token conflicts + * - The move operation fails with OperationalJobStatus.FAILED when targeting an existing token + * - The original node retains its initial token after the failed move attempt + *

+ * Token conflicts must be prevented to maintain cluster integrity, as having multiple + * nodes own the same token would break the consistent hashing ring and cause data + * distribution issues. + */ + @Test + void testNodeMoveOperationFailure() + { + // Get a token already owned by a node + String testToken = getCurrentTokenForNode("localhost2"); + String requestBody = "{\"newToken\":\"" + testToken + "\"}"; + + // Validate that the node owns a different token than testToken + String initialToken = getCurrentTokenForNode("localhost"); + assertThat(initialToken).isNotEqualTo(testToken); + + // Initiate move operation + HttpResponse moveResponse = getBlocking( + trustedClient().put(serverWrapper.serverPort, "localhost", ApiEndpointsV1.NODE_MOVE_ROUTE) + .putHeader("content-type", "application/json") + .sendBuffer(Buffer.buffer(requestBody))); + + assertThat(moveResponse.statusCode()).isIn(OK.code(), ACCEPTED.code()); + + JsonObject responseBody = moveResponse.bodyAsJsonObject(); + assertThat(responseBody).isNotNull(); + assertThat(responseBody.getString("jobId")).isNotNull(); + assertThat(responseBody.getString("operation")).isEqualTo("move"); + assertThat(responseBody.getString("jobStatus")).isIn( + OperationalJobStatus.CREATED.name(), + OperationalJobStatus.RUNNING.name(), + OperationalJobStatus.FAILED.name() + ); + + // Verify the job eventually completes (or at least gets processed) + loopAssert(30, 500, () -> { + HttpResponse streamStatsResponse = getBlocking( + trustedClient().get(serverWrapper.serverPort, "localhost", ApiEndpointsV1.STREAM_STATS_ROUTE) + .send()); + + assertThat(streamStatsResponse.statusCode()).isEqualTo(OK.code()); + + JsonObject streamStats = streamStatsResponse.bodyAsJsonObject(); + assertThat(streamStats).isNotNull(); + // The operationMode should be either NORMAL (completed) or MOVING (in progress) + assertThat(streamStats.getString("operationMode")).isIn("NORMAL", "MOVING"); + }); + + // Validate the operational job status using the OperationalJobHandler + String jobId = responseBody.getString("jobId"); + validateOperationalJobStatus(jobId, "move", OperationalJobStatus.FAILED); + + // Validate that the node didn't move + String currentToken = getCurrentTokenForNode("localhost"); + assertThat(currentToken).isEqualTo(initialToken); + assertThat(currentToken).isNotEqualTo(testToken); + } + + /** + * Gets the current token for the specified node by querying the ring endpoint. + * + * @param node the node hostname to get the token for + * @return the token currently owned by the specified node + */ + private String getCurrentTokenForNode(String node) + { + HttpResponse ringResponse = getBlocking( + trustedClient().get(serverWrapper.serverPort, node, ApiEndpointsV1.RING_ROUTE) + .send()); + + assertThat(ringResponse.statusCode()).isEqualTo(OK.code()); + + RingResponse ring = ringResponse.bodyAsJson(RingResponse.class); + assertThat(ring).isNotNull(); + + RingEntry ringEntry = ring.stream() + .filter(entry -> entry.fqdn().equals(node)) + .findFirst() + .orElseThrow(() -> new AssertionError("Node " + node + " not found in ring")); + return ringEntry.token(); } /** * Validates the operational job status by querying the OperationalJobHandler endpoint * and waiting for the job to reach a final state if necessary. * - * @param jobId the ID of the operational job to validate + * @param jobId the ID of the operational job to validate * @param expectedOperation the expected operation name (e.g., "move", "decommission", "drain") */ - private void validateOperationalJobStatus(String jobId, String expectedOperation) + private void validateOperationalJobStatus(String jobId, String expectedOperation, OperationalJobStatus expectedEndStatus) { String operationalJobRoute = ApiEndpointsV1.OPERATIONAL_JOB_ROUTE.replace(":operationId", jobId); @@ -158,10 +267,6 @@ private void validateOperationalJobStatus(String jobId, String expectedOperation assertThat(jobStatusBody).isNotNull(); assertThat(jobStatusBody.getString("jobId")).isEqualTo(jobId); assertThat(jobStatusBody.getString("operation")).isEqualTo(expectedOperation); - assertThat(jobStatusBody.getString("jobStatus")).isIn( - OperationalJobStatus.RUNNING.name(), - OperationalJobStatus.SUCCEEDED.name() - ); // If the job is still running, wait for it to complete or reach a final state if (OperationalJobStatus.RUNNING.name().equals(jobStatusBody.getString("jobStatus"))) @@ -181,6 +286,18 @@ private void validateOperationalJobStatus(String jobId, String expectedOperation ); }); } + + jobStatusResponse = getBlocking( + trustedClient().get(serverWrapper.serverPort, "localhost", operationalJobRoute) + .send()); + + assertThat(jobStatusResponse.statusCode()).isEqualTo(OK.code()); + + jobStatusBody = jobStatusResponse.bodyAsJsonObject(); + assertThat(jobStatusBody).isNotNull(); + assertThat(jobStatusBody.getString("jobId")).isEqualTo(jobId); + assertThat(jobStatusBody.getString("operation")).isEqualTo(expectedOperation); + assertThat(jobStatusBody.getString("jobStatus")).isEqualTo(expectedEndStatus.name()); } /** diff --git a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java index 4fcb0be51..e7e80b274 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java @@ -31,8 +31,10 @@ import io.vertx.ext.auth.authorization.Authorization; import io.vertx.ext.web.RoutingContext; import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions; +import org.apache.cassandra.sidecar.adapters.base.utils.DataTypeUtils; import org.apache.cassandra.sidecar.common.request.data.NodeMoveRequestPayload; import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.common.utils.StringUtils; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; import org.apache.cassandra.sidecar.config.ServiceConfiguration; import org.apache.cassandra.sidecar.job.NodeMoveJob; @@ -47,7 +49,7 @@ /** * Provides REST API for asynchronously moving the corresponding Cassandra node to a new token */ -public class NodeMoveHandler extends AbstractHandler implements AccessProtected +public class NodeMoveHandler extends AbstractHandler implements AccessProtected { private final OperationalJobManager jobManager; private final ServiceConfiguration config; @@ -85,10 +87,10 @@ public void handleInternal(RoutingContext context, HttpServerRequest httpRequest, @NotNull String host, SocketAddress remoteAddress, - NodeMoveRequestPayload requestPayload) + String newToken) { StorageOperations operations = metadataFetcher.delegate(host).storageOperations(); - NodeMoveJob job = new NodeMoveJob(UUIDs.timeBased(), requestPayload.newToken(), operations); + NodeMoveJob job = new NodeMoveJob(UUIDs.timeBased(), newToken, operations); this.jobManager.trySubmitJob(job, (completedJob, exception) -> OperationalJobUtils.sendStatusBasedResponse(context, completedJob, exception), @@ -100,7 +102,7 @@ public void handleInternal(RoutingContext context, * {@inheritDoc} */ @Override - protected NodeMoveRequestPayload extractParamsOrThrow(RoutingContext context) + protected String extractParamsOrThrow(RoutingContext context) { String body = context.body().asString(); if (body == null || body.equalsIgnoreCase("null")) @@ -110,12 +112,20 @@ protected NodeMoveRequestPayload extractParamsOrThrow(RoutingContext context) } try { - return Json.decodeValue(body, NodeMoveRequestPayload.class); + NodeMoveRequestPayload payload = Json.decodeValue(body, NodeMoveRequestPayload.class); + String newToken = payload.newToken(); + if (StringUtils.isNullOrEmpty(newToken) || !DataTypeUtils.isValidBigInt(newToken)) + { + throw new IllegalArgumentException( + String.format("newToken value must be a valid number. Provided value=%s", newToken)); + } + return newToken.trim(); } catch (DecodeException e) { logger.warn("Bad request. Received invalid JSON payload."); - throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Invalid newToken value: " + e.getMessage()); + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, + "Failed to parse NodeMoveRequestPayload error=" + e.getMessage()); } } } From 0373271852fb56515fe6ed664792bef4675a0292 Mon Sep 17 00:00:00 2001 From: Sudipta Laha Date: Mon, 15 Dec 2025 07:41:04 -0800 Subject: [PATCH 05/10] Addressed review comments --- .../base/CassandraStorageOperations.java | 2 +- .../GossipDependentStorageJmxOperations.java | 2 +- .../base/jmx/StorageJmxOperations.java | 2 +- .../request/data/NodeMoveRequestPayload.java | 4 +- .../common/server/StorageOperations.java | 2 +- .../sidecar/handlers/NodeMoveHandler.java | 4 +- .../cassandra/sidecar/job/NodeMoveJob.java | 2 +- .../cassandra/sidecar/job/OperationalJob.java | 2 +- .../modules/CassandraOperationsModule.java | 4 ++ .../sidecar/handlers/NodeMoveHandlerTest.java | 68 ++++++++++++++++--- .../sidecar/job/NodeMoveJobTest.java | 7 +- 11 files changed, 78 insertions(+), 21 deletions(-) diff --git a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java index cd66c4e56..275dbb8a5 100644 --- a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java +++ b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java @@ -338,7 +338,7 @@ public int getCompactionThroughputMbPerSec() * {@inheritDoc} */ @Override - public void move(String newToken) + public void move(String newToken) throws IOException { jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME) .move(newToken); diff --git a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java index 5fa0788e9..ae7f561d7 100644 --- a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java +++ b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java @@ -227,7 +227,7 @@ public int getCompactionThroughputMbPerSec() } @Override - public void move(String newToken) + public void move(String newToken) throws IOException { delegate.move(newToken); } diff --git a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java index 56868cf72..ced40624b 100644 --- a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java +++ b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java @@ -239,5 +239,5 @@ public interface StorageJmxOperations * * @param newToken the new token for the node to move to */ - void move(String newToken); + void move(String newToken) throws IOException; } diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/NodeMoveRequestPayload.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/NodeMoveRequestPayload.java index 6e7990a5c..3254c1493 100644 --- a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/NodeMoveRequestPayload.java +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/NodeMoveRequestPayload.java @@ -18,6 +18,8 @@ package org.apache.cassandra.sidecar.common.request.data; +import java.util.Objects; + import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; @@ -41,7 +43,7 @@ public class NodeMoveRequestPayload @JsonCreator public NodeMoveRequestPayload(@JsonProperty(value = "newToken", required = true) String newToken) { - this.newToken = newToken; + this.newToken = Objects.requireNonNull(newToken, "newToken is required"); } /** diff --git a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java index 027dd94b8..163a30c97 100644 --- a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java +++ b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java @@ -185,5 +185,5 @@ default void outOfRangeDataCleanup(@NotNull String keyspace, @NotNull String tab * * @param newToken the new token for the node to move to */ - void move(String newToken); + void move(String newToken) throws IOException; } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java index e7e80b274..f04b59f4e 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java @@ -23,6 +23,7 @@ import com.datastax.driver.core.utils.UUIDs; import com.google.inject.Inject; +import com.google.inject.Singleton; import io.netty.handler.codec.http.HttpResponseStatus; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.json.DecodeException; @@ -49,6 +50,7 @@ /** * Provides REST API for asynchronously moving the corresponding Cassandra node to a new token */ +@Singleton public class NodeMoveHandler extends AbstractHandler implements AccessProtected { private final OperationalJobManager jobManager; @@ -107,7 +109,6 @@ protected String extractParamsOrThrow(RoutingContext context) String body = context.body().asString(); if (body == null || body.equalsIgnoreCase("null")) { - logger.warn("Bad request. Received null payload."); throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Request body must be JSON with a non-null \"newToken\" field"); } try @@ -123,7 +124,6 @@ protected String extractParamsOrThrow(RoutingContext context) } catch (DecodeException e) { - logger.warn("Bad request. Received invalid JSON payload."); throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Failed to parse NodeMoveRequestPayload error=" + e.getMessage()); } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/job/NodeMoveJob.java b/server/src/main/java/org/apache/cassandra/sidecar/job/NodeMoveJob.java index f9f3cbf8d..f03d63389 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/job/NodeMoveJob.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/job/NodeMoveJob.java @@ -54,7 +54,7 @@ public boolean isRunningOnCassandra() * {@inheritDoc} */ @Override - protected void executeInternal() + protected void executeInternal() throws Exception { LOGGER.info("Executing move operation. jobId={} newToken={}", this.jobId(), newToken); storageOperations.move(newToken); diff --git a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java index c4b6b9a34..04fa5d0c5 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java @@ -189,7 +189,7 @@ public Future asyncResult(TaskExecutorPool executorPool, DurationSpec wait /** * OperationalJob body. The implementation is executed in a blocking manner. */ - protected abstract void executeInternal(); + protected abstract void executeInternal() throws Exception; /** * Execute the job behavior as specified in the internal execution {@link #executeInternal()}, diff --git a/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java b/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java index 8465a1497..8f7a2f6e4 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java @@ -193,6 +193,10 @@ VertxRoute cassandraNodeDrainRoute(RouteBuilder.Factory factory, responseCode = "202", content = @Content(mediaType = "application/json", schema = @Schema(implementation = OperationalJobResponse.class))) + @APIResponse(description = "Conflicting node move job encountered", + responseCode = "409", + content = @Content(mediaType = "application/json", + schema = @Schema(implementation = OperationalJobResponse.class))) @ProvidesIntoMap @KeyClassMapKey(VertxRouteMapKeys.CassandraNodeMoveRouteKey.class) VertxRoute cassandraNodeMoveRoute(RouteBuilder.Factory factory, diff --git a/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandlerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandlerTest.java index 4e0170eff..c653df35f 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandlerTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandlerTest.java @@ -18,6 +18,7 @@ package org.apache.cassandra.sidecar.handlers; +import java.io.IOException; import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -110,7 +111,7 @@ void after() throws InterruptedException } @Test - void testMoveLongRunning(VertxTestContext context) + void testMoveLongRunning(VertxTestContext context) throws IOException { when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL); doAnswer(AdditionalAnswers.answersWithDelay(6000, invocation -> null)) @@ -147,13 +148,20 @@ void testMoveCompleted(VertxTestContext context) assertThat(moveResponse).isNotNull(); assertThat(moveResponse.status()).isEqualTo(SUCCEEDED); assertThat(moveResponse.operation()).isEqualTo("move"); - verify(mockStorageOperations).move("123456789"); + try + { + verify(mockStorageOperations).move("123456789"); + } + catch (IOException e) + { + throw new RuntimeException(e); + } context.completeNow(); })); } @Test - void testMoveFailed(VertxTestContext context) + void testMoveFailed(VertxTestContext context) throws IOException { when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL); doThrow(new RuntimeException("Simulated failure")).when(mockStorageOperations).move(anyString()); @@ -183,7 +191,14 @@ void testMoveConflictAlreadyMoving(VertxTestContext context) OperationalJobResponse moveResponse = response.bodyAsJson(OperationalJobResponse.class); assertThat(moveResponse).isNotNull(); assertThat(moveResponse.jobId()).isNotNull(); - verify(mockStorageOperations, never()).move(anyString()); // Should not call move when already moving + try + { + verify(mockStorageOperations, never()).move(anyString()); // Should not call move when already moving + } + catch (IOException e) + { + throw new RuntimeException(e); + } context.completeNow(); })); } @@ -198,7 +213,14 @@ void testMoveWithMissingToken(VertxTestContext context) .putHeader("content-type", "application/json") .sendBuffer(io.vertx.core.buffer.Buffer.buffer(requestBody), context.succeeding(response -> { assertThat(response.statusCode()).isEqualTo(BAD_REQUEST.code()); - verify(mockStorageOperations, never()).move(anyString()); + try + { + verify(mockStorageOperations, never()).move(anyString()); + } + catch (IOException e) + { + throw new RuntimeException(e); + } context.completeNow(); })); } @@ -213,7 +235,14 @@ void testMoveWithEmptyToken(VertxTestContext context) .putHeader("content-type", "application/json") .sendBuffer(io.vertx.core.buffer.Buffer.buffer(requestBody), context.succeeding(response -> { assertThat(response.statusCode()).isEqualTo(BAD_REQUEST.code()); - verify(mockStorageOperations, never()).move(anyString()); + try + { + verify(mockStorageOperations, never()).move(anyString()); + } + catch (IOException e) + { + throw new RuntimeException(e); + } context.completeNow(); })); } @@ -228,7 +257,14 @@ void testMoveWithInvalidToken(VertxTestContext context) .putHeader("content-type", "application/json") .sendBuffer(io.vertx.core.buffer.Buffer.buffer(requestBody), context.succeeding(response -> { assertThat(response.statusCode()).isEqualTo(BAD_REQUEST.code()); - verify(mockStorageOperations, never()).move(anyString()); + try + { + verify(mockStorageOperations, never()).move(anyString()); + } + catch (IOException e) + { + throw new RuntimeException(e); + } context.completeNow(); })); } @@ -246,7 +282,14 @@ void testMoveWithNegativeToken(VertxTestContext context) OperationalJobResponse moveResponse = response.bodyAsJson(OperationalJobResponse.class); assertThat(moveResponse).isNotNull(); assertThat(moveResponse.status()).isEqualTo(SUCCEEDED); - verify(mockStorageOperations).move("-9223372036854775808"); + try + { + verify(mockStorageOperations).move("-9223372036854775808"); + } + catch (IOException e) + { + throw new RuntimeException(e); + } context.completeNow(); })); } @@ -264,7 +307,14 @@ void testMoveWithZeroToken(VertxTestContext context) OperationalJobResponse moveResponse = response.bodyAsJson(OperationalJobResponse.class); assertThat(moveResponse).isNotNull(); assertThat(moveResponse.status()).isEqualTo(SUCCEEDED); - verify(mockStorageOperations).move("0"); + try + { + verify(mockStorageOperations).move("0"); + } + catch (IOException e) + { + throw new RuntimeException(e); + } context.completeNow(); })); } diff --git a/server/src/test/java/org/apache/cassandra/sidecar/job/NodeMoveJobTest.java b/server/src/test/java/org/apache/cassandra/sidecar/job/NodeMoveJobTest.java index 50ab789ba..ecd4271be 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/job/NodeMoveJobTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/job/NodeMoveJobTest.java @@ -18,6 +18,7 @@ package org.apache.cassandra.sidecar.job; +import java.io.IOException; import java.util.UUID; import org.junit.jupiter.api.BeforeEach; @@ -103,7 +104,7 @@ void testStatusWhenNormal() } @Test - void testStatusWhenFailed() + void testStatusWhenFailed() throws IOException { when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL); RuntimeException testException = new RuntimeException("Test failure"); @@ -119,7 +120,7 @@ void testStatusWhenFailed() } @Test - void testExecuteInternalCallsMove() + void testExecuteInternalCallsMove() throws IOException { when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL); NodeMoveJob job = new NodeMoveJob(jobId, newToken, mockStorageOperations); @@ -132,7 +133,7 @@ void testExecuteInternalCallsMove() } @Test - void testExecuteInternalHandlesException() + void testExecuteInternalHandlesException() throws IOException { when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL); RuntimeException testException = new RuntimeException("Test exception"); From 8153996efef962403706f00b93d786aaa0a05071 Mon Sep 17 00:00:00 2001 From: Sudipta Laha Date: Mon, 22 Dec 2025 08:36:23 -0800 Subject: [PATCH 06/10] Updated validation for token --- .../adapters/base/utils/DataTypeUtils.java | 26 --------- .../base/utils/DataTypeUtilsTest.java | 56 ------------------ .../sidecar/handlers/NodeMoveHandler.java | 16 +++-- .../sidecar/handlers/NodeMoveHandlerTest.java | 58 ++++++++++++++++++- 4 files changed, 68 insertions(+), 88 deletions(-) diff --git a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/utils/DataTypeUtils.java b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/utils/DataTypeUtils.java index 77434ae92..edd777e83 100644 --- a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/utils/DataTypeUtils.java +++ b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/utils/DataTypeUtils.java @@ -18,8 +18,6 @@ package org.apache.cassandra.sidecar.adapters.base.utils; -import java.math.BigInteger; - /** * Utility class for data type conversions. */ @@ -121,28 +119,4 @@ public static long mebibytesToBytes(long mebibytes) { return mebibytes << 20; } - - /** - * Validates whether a string can be parsed as a valid BigInteger. - * This method trims whitespace from the input string before validation. - * - * @param newToken the string to validate as a BigInteger, can be null - * @return true if the string can be successfully parsed as a BigInteger, false if null or invalid - */ - public static boolean isValidBigInt(String newToken) - { - if (newToken == null) - { - return false; - } - try - { - new BigInteger(newToken.trim()); - return true; - } - catch (NumberFormatException e) - { - return false; - } - } } diff --git a/adapters/adapters-base/src/test/java/org/apache/cassandra/sidecar/adapters/base/utils/DataTypeUtilsTest.java b/adapters/adapters-base/src/test/java/org/apache/cassandra/sidecar/adapters/base/utils/DataTypeUtilsTest.java index 069eba854..d0b83dec4 100644 --- a/adapters/adapters-base/src/test/java/org/apache/cassandra/sidecar/adapters/base/utils/DataTypeUtilsTest.java +++ b/adapters/adapters-base/src/test/java/org/apache/cassandra/sidecar/adapters/base/utils/DataTypeUtilsTest.java @@ -144,60 +144,4 @@ void testMebibytesToBytes(String testCase, long megabytes, long expectedBytes) long result = DataTypeUtils.mebibytesToBytes(megabytes); assertEquals(expectedBytes, result); } - - static Stream isValidBigIntTestCases() - { - return Stream.of( - // Test null input - should return false - Arguments.of("null input", null, false), - // Test valid positive integer - Arguments.of("valid positive integer", "123", true), - // Test valid negative integer - Arguments.of("valid negative integer", "-456", true), - // Test zero - Arguments.of("valid zero", "0", true), - // Test very large positive number (beyond Long.MAX_VALUE) - Arguments.of("valid large positive number", "12345678901234567890123456789", true), - // Test very large negative number (beyond Long.MIN_VALUE) - Arguments.of("valid large negative number", "-98765432109876543210987654321", true), - // Test number with leading whitespace - should be valid after trim - Arguments.of("valid with leading whitespace", " 789", true), - // Test number with trailing whitespace - should be valid after trim - Arguments.of("valid with trailing whitespace", "321 ", true), - // Test number with leading and trailing whitespace - should be valid after trim - Arguments.of("valid with leading and trailing whitespace", " 999 ", true), - // Test number with just whitespace and sign - Arguments.of("valid negative with whitespace", " -555 ", true), - // Test empty string - should be invalid - Arguments.of("invalid empty string", "", false), - // Test string with only whitespace - should be invalid - Arguments.of("invalid whitespace only", " ", false), - // Test string with alphabetic characters - Arguments.of("invalid alphabetic", "abc", false), - // Test string with mixed alphanumeric - Arguments.of("invalid alphanumeric", "123abc", false), - // Test string with special characters - Arguments.of("invalid special characters", "123!@#", false), - // Test string with decimal point - Arguments.of("invalid decimal", "123.45", false), - // Test string with scientific notation - Arguments.of("invalid scientific notation", "1e5", false), - // Test string with multiple signs - Arguments.of("invalid multiple signs", "--123", false), - // Test string with sign in middle - Arguments.of("invalid sign in middle", "12-3", false), - // Test string with plus sign (valid) - Arguments.of("valid with plus sign", "+123", true), - // Test string with hex prefix - Arguments.of("invalid hex prefix", "0x123", false) - ); - } - - @ParameterizedTest - @MethodSource("isValidBigIntTestCases") - void testIsValidBigInt(String testCase, String input, boolean expectedResult) - { - boolean result = DataTypeUtils.isValidBigInt(input); - assertEquals(expectedResult, result); - } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java index f04b59f4e..4e04b2857 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java @@ -32,7 +32,6 @@ import io.vertx.ext.auth.authorization.Authorization; import io.vertx.ext.web.RoutingContext; import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions; -import org.apache.cassandra.sidecar.adapters.base.utils.DataTypeUtils; import org.apache.cassandra.sidecar.common.request.data.NodeMoveRequestPayload; import org.apache.cassandra.sidecar.common.server.StorageOperations; import org.apache.cassandra.sidecar.common.utils.StringUtils; @@ -53,6 +52,8 @@ @Singleton public class NodeMoveHandler extends AbstractHandler implements AccessProtected { + private static final int MAX_TOKEN_LENGTH = 128; + private final OperationalJobManager jobManager; private final ServiceConfiguration config; @@ -115,12 +116,19 @@ protected String extractParamsOrThrow(RoutingContext context) { NodeMoveRequestPayload payload = Json.decodeValue(body, NodeMoveRequestPayload.class); String newToken = payload.newToken(); - if (StringUtils.isNullOrEmpty(newToken) || !DataTypeUtils.isValidBigInt(newToken)) + if (StringUtils.isNullOrEmpty(newToken)) + { + throw new IllegalArgumentException("newToken value cannot be null or empty"); + } + + String trimmedToken = newToken.trim(); + if (trimmedToken.length() >= MAX_TOKEN_LENGTH) { throw new IllegalArgumentException( - String.format("newToken value must be a valid number. Provided value=%s", newToken)); + String.format("newToken value must be less than %d characters. Provided value length=%d", + MAX_TOKEN_LENGTH, trimmedToken.length())); } - return newToken.trim(); + return trimmedToken; } catch (DecodeException e) { diff --git a/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandlerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandlerTest.java index c653df35f..1ad876637 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandlerTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandlerTest.java @@ -248,10 +248,12 @@ void testMoveWithEmptyToken(VertxTestContext context) } @Test - void testMoveWithInvalidToken(VertxTestContext context) + void testMoveWithInvalidTokenTooLong(VertxTestContext context) { WebClient client = WebClient.create(vertx); - String requestBody = "{\"newToken\":\"invalidtoken\"}"; // Invalid token in JSON + // Create a token string that is exactly 128 characters (should fail) + String longToken = "a".repeat(128); + String requestBody = "{\"newToken\":\"" + longToken + "\"}"; client.put(server.actualPort(), LOCAL_HOST, MOVE_ROUTE) .expect(ResponsePredicate.SC_BAD_REQUEST) .putHeader("content-type", "application/json") @@ -269,6 +271,58 @@ void testMoveWithInvalidToken(VertxTestContext context) })); } + @Test + void testMoveWithValidAlphanumericToken(VertxTestContext context) + { + when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL); + WebClient client = WebClient.create(vertx); + String requestBody = "{\"newToken\":\"validtoken123\"}"; // Valid alphanumeric token + client.put(server.actualPort(), LOCAL_HOST, MOVE_ROUTE) + .putHeader("content-type", "application/json") + .sendBuffer(io.vertx.core.buffer.Buffer.buffer(requestBody), context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(OK.code()); + OperationalJobResponse moveResponse = response.bodyAsJson(OperationalJobResponse.class); + assertThat(moveResponse).isNotNull(); + assertThat(moveResponse.status()).isEqualTo(SUCCEEDED); + try + { + verify(mockStorageOperations).move("validtoken123"); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + context.completeNow(); + })); + } + + @Test + void testMoveWithTokenAtMaxLength(VertxTestContext context) + { + when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL); + WebClient client = WebClient.create(vertx); + // Create a token string that is 127 characters (should pass) + String maxLengthToken = "a".repeat(127); + String requestBody = "{\"newToken\":\"" + maxLengthToken + "\"}"; + client.put(server.actualPort(), LOCAL_HOST, MOVE_ROUTE) + .putHeader("content-type", "application/json") + .sendBuffer(io.vertx.core.buffer.Buffer.buffer(requestBody), context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(OK.code()); + OperationalJobResponse moveResponse = response.bodyAsJson(OperationalJobResponse.class); + assertThat(moveResponse).isNotNull(); + assertThat(moveResponse.status()).isEqualTo(SUCCEEDED); + try + { + verify(mockStorageOperations).move(maxLengthToken); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + context.completeNow(); + })); + } + @Test void testMoveWithNegativeToken(VertxTestContext context) { From faf140794c942cb1743441d178bed787dba0f1cc Mon Sep 17 00:00:00 2001 From: Sudipta Laha Date: Mon, 22 Dec 2025 10:41:22 -0800 Subject: [PATCH 07/10] Added documentation for MAX_TOKEN_LENGTH --- .../apache/cassandra/sidecar/handlers/NodeMoveHandler.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java index 4e04b2857..c76798695 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java @@ -52,6 +52,13 @@ @Singleton public class NodeMoveHandler extends AbstractHandler implements AccessProtected { + /** + * Maximum allowed length for token strings in characters. + *

+ * This limit is set conservatively to accommodate all valid token formats, including + * the string representation of {@code Long.MIN_VALUE} (-9223372036854775808, 20 characters) + * with substantial margin for other token representations or future extensions. + */ private static final int MAX_TOKEN_LENGTH = 128; private final OperationalJobManager jobManager; From 28cdd8d81fb9c8a029f0b9440185dfe0b90864c1 Mon Sep 17 00:00:00 2001 From: Sudipta Laha Date: Mon, 5 Jan 2026 12:25:11 -0800 Subject: [PATCH 08/10] Debugging test failure --- .../sidecar/routes/CassandraNodeOperationsIntegrationTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java index a36257bbc..1caedd6a1 100644 --- a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java +++ b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java @@ -127,6 +127,7 @@ void testNodeMoveOperationSuccess() assertThat(responseBody).isNotNull(); assertThat(responseBody.getString("jobId")).isNotNull(); assertThat(responseBody.getString("operation")).isEqualTo("move"); + logger.error("jobStatus for testNodeMoveOperationSuccess:{}", responseBody.getString("jobStatus")); assertThat(responseBody.getString("jobStatus")).isIn( OperationalJobStatus.CREATED.name(), OperationalJobStatus.RUNNING.name(), @@ -208,6 +209,7 @@ void testNodeMoveOperationFailure() JsonObject streamStats = streamStatsResponse.bodyAsJsonObject(); assertThat(streamStats).isNotNull(); + logger.error("operationMode for testNodeMoveOperationFailure:{}", streamStats.getString("operationMode")); // The operationMode should be either NORMAL (completed) or MOVING (in progress) assertThat(streamStats.getString("operationMode")).isIn("NORMAL", "MOVING"); }); From ae1a3cc5ad444216cd228398fc8c3022b8f658d3 Mon Sep 17 00:00:00 2001 From: Sudipta Laha Date: Thu, 8 Jan 2026 16:10:26 -0800 Subject: [PATCH 09/10] Fixed test failure for trunk --- .../routes/CassandraNodeOperationsIntegrationTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java index 1caedd6a1..5566342d6 100644 --- a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java +++ b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java @@ -25,6 +25,7 @@ import io.vertx.core.buffer.Buffer; import io.vertx.core.json.JsonObject; import io.vertx.ext.web.client.HttpResponse; +import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.sidecar.common.ApiEndpointsV1; import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; import org.apache.cassandra.sidecar.common.response.RingResponse; @@ -50,7 +51,8 @@ protected ClusterBuilderConfiguration testClusterConfiguration() { return super.testClusterConfiguration() .dcCount(1) - .nodesPerDc(3); + .nodesPerDc(3) + .requestFeature(Feature.NETWORK); } @Override @@ -127,7 +129,6 @@ void testNodeMoveOperationSuccess() assertThat(responseBody).isNotNull(); assertThat(responseBody.getString("jobId")).isNotNull(); assertThat(responseBody.getString("operation")).isEqualTo("move"); - logger.error("jobStatus for testNodeMoveOperationSuccess:{}", responseBody.getString("jobStatus")); assertThat(responseBody.getString("jobStatus")).isIn( OperationalJobStatus.CREATED.name(), OperationalJobStatus.RUNNING.name(), @@ -209,7 +210,6 @@ void testNodeMoveOperationFailure() JsonObject streamStats = streamStatsResponse.bodyAsJsonObject(); assertThat(streamStats).isNotNull(); - logger.error("operationMode for testNodeMoveOperationFailure:{}", streamStats.getString("operationMode")); // The operationMode should be either NORMAL (completed) or MOVING (in progress) assertThat(streamStats.getString("operationMode")).isIn("NORMAL", "MOVING"); }); From 88ec9c7b04840e10bc33cfd22ee6588a04960b7b Mon Sep 17 00:00:00 2001 From: Sudipta Laha Date: Fri, 9 Jan 2026 00:41:15 -0800 Subject: [PATCH 10/10] Addressed review comments --- .../org/apache/cassandra/sidecar/client/RequestContext.java | 2 +- .../apache/cassandra/sidecar/client/SidecarClientTest.java | 2 +- .../cassandra/sidecar/handlers/NodeMoveHandlerTest.java | 6 ++++++ 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java index bbc006aa2..c3442373a 100644 --- a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java +++ b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java @@ -618,7 +618,7 @@ public Builder nodeDrainRequest() * @param newToken the new token for the node to move to * @return a reference to this Builder */ - public Builder nodeMoveRequest(String newToken) + public Builder nodeMoveRequest(@NotNull String newToken) { return request(new NodeMoveRequest(newToken)); } diff --git a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java index 40365bd05..032a937c5 100644 --- a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java +++ b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java @@ -1473,7 +1473,7 @@ public void testNodeMove() throws Exception { UUID jobId = UUID.randomUUID(); String newToken = "123456789"; - String nodeMoveString = "{\"jobId\":\"" + jobId + "\",\"jobStatus\":\"SUCCEEDED\",\"instance\":\"127.0.0.1\"}"; + String nodeMoveString = "{\"jobId\":\"" + jobId + "\",\"jobStatus\":\"SUCCEEDED\"}"; MockResponse response = new MockResponse() .setResponseCode(OK.code()) diff --git a/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandlerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandlerTest.java index 1ad876637..1ff8294cc 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandlerTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandlerTest.java @@ -56,6 +56,7 @@ import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; import static io.netty.handler.codec.http.HttpResponseStatus.CONFLICT; import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.FAILED; import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING; import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED; import static org.assertj.core.api.Assertions.assertThat; @@ -172,6 +173,11 @@ void testMoveFailed(VertxTestContext context) throws IOException .putHeader("content-type", "application/json") .sendBuffer(io.vertx.core.buffer.Buffer.buffer(requestBody), context.succeeding(response -> { assertThat(response.statusCode()).isEqualTo(OK.code()); + OperationalJobResponse moveResponse = response.bodyAsJson(OperationalJobResponse.class); + assertThat(moveResponse).isNotNull(); + assertThat(moveResponse.status()).isEqualTo(FAILED); + assertThat(moveResponse.operation()).isEqualTo("move"); + assertThat(moveResponse.reason()).isEqualTo("Simulated failure"); context.completeNow(); })); }