Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
0.3.0
-----
* Sidecar endpoint for moving a node to a new token (CASSSIDECAR-344)
* Returning JSON responses for live migration status endpoints in case of errors (CASSSIDECAR-395)
* Upgrade vertx to 4.5.23 (CASSSIDECAR-391)
* Fix for deadlock during JMX reconnection (CASSSIDECAR-390)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,4 +344,14 @@ public int getCompactionThroughputMbPerSec()
return jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME)
.getCompactionThroughputMbPerSec();
}

/**
* {@inheritDoc}
*/
@Override
public void move(String newToken) throws IOException
{
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME)
.move(newToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -225,4 +225,10 @@ public int getCompactionThroughputMbPerSec()
{
return delegate.getCompactionThroughputMbPerSec();
}

@Override
public void move(String newToken) throws IOException
{
delegate.move(newToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,4 +233,11 @@ public interface StorageJmxOperations
* @return the current compaction throughput in megabytes per second, or 0 if throughput cannot be determined
*/
int getCompactionThroughputMbPerSec();

/**
* Triggers the node move operation to move this node to a new token
*
* @param newToken the new token for the node to move to
*/
void move(String newToken) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,20 @@ else if (value instanceof Long)
* This method performs a runtime type check before casting to prevent ClassCastException
* and provides meaningful error messages when the cast fails.
*
* @param value the object to be cast
* @param expectedType the expected type to cast to
* @param value the object to be cast
* @param expectedType the expected type to cast to
* @param contextDescription descriptive context for error messages (e.g., "keyspace name", "table data")
* @param <T> the target type
* @param <T> the target type
* @return the cast object of type T
* @throws IllegalStateException if the value is not an instance of the expected type,
* with a descriptive message indicating what was expected vs what was received
* with a descriptive message indicating what was expected vs what was received
*/
public static <T> T safeCast(Object value, Class<T> expectedType, String contextDescription)
{
if (!expectedType.isInstance(value))
{
throw new ClassCastException("Expected " + expectedType.getSimpleName() + " for " + contextDescription + " but got: " +
(value == null ? "null" : value.getClass().getSimpleName()));
(value == null ? "null" : value.getClass().getSimpleName()));
}
return expectedType.cast(value);
}
Expand All @@ -84,11 +84,11 @@ public static <T> T safeCast(Object value, Class<T> expectedType, String context
* Safely parses a string to a long with descriptive error handling.
* This method handles null values and provides meaningful error messages when parsing fails.
*
* @param value the string value to be parsed
* @param value the string value to be parsed
* @param contextDescription descriptive context for error messages (e.g., "completed bytes", "total bytes")
* @return the parsed long value
* @throws IllegalStateException if the value cannot be parsed as a long,
* with a descriptive message indicating what failed to parse
* with a descriptive message indicating what failed to parse
*/
public static long safeParseLong(String value, String contextDescription)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public final class ApiEndpointsV1
public static final String LIST_OPERATIONAL_JOBS_ROUTE = API_V1 + CASSANDRA + OPERATIONAL_JOBS;
public static final String OPERATIONAL_JOB_ROUTE = API_V1 + CASSANDRA + PER_OPERATIONAL_JOB;
public static final String NODE_DECOMMISSION_ROUTE = API_V1 + CASSANDRA + "/operations/decommission";
public static final String NODE_MOVE_ROUTE = API_V1 + CASSANDRA + "/operations/move";
public static final String NODE_DRAIN_ROUTE = API_V1 + CASSANDRA + "/operations/drain";
public static final String STREAM_STATS_ROUTE = API_V1 + CASSANDRA + "/stats/streams";
public static final String TABLE_STATS_ROUTE = API_V1 + CASSANDRA + PER_KEYSPACE + PER_TABLE + "/stats";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.sidecar.common.request;

import io.netty.handler.codec.http.HttpMethod;
import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
import org.apache.cassandra.sidecar.common.request.data.NodeMoveRequestPayload;
import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;

/**
* Represents a request to execute node move operation
*/
public class NodeMoveRequest extends JsonRequest<OperationalJobResponse>
{
private final NodeMoveRequestPayload payload;

/**
* Constructs a request to execute a node move operation
*
* @param newToken the new token for the node to move to
*/
public NodeMoveRequest(String newToken)
{
super(ApiEndpointsV1.NODE_MOVE_ROUTE);
this.payload = new NodeMoveRequestPayload(newToken);
}

/**
* {@inheritDoc}
*/
@Override
public HttpMethod method()
{
return HttpMethod.PUT;
}

/**
* {@inheritDoc}
*/
@Override
public Object requestBody()
{
return payload;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.sidecar.common.request.data;

import java.util.Objects;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;

/**
* Request payload for node move operations.
*
* <p>Valid JSON:</p>
* <pre>
* { "newToken": "123456789" }
* </pre>
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
public class NodeMoveRequestPayload
{
private final String newToken;

/**
* @param newToken the new token for the node to move to
*/
@JsonCreator
public NodeMoveRequestPayload(@JsonProperty(value = "newToken", required = true) String newToken)
{
this.newToken = Objects.requireNonNull(newToken, "newToken is required");
}

/**
* @return the new token for the node to move to
*/
@JsonProperty("newToken")
public String newToken()
{
return newToken;
}

@Override
public String toString()
{
return "NodeMoveRequestPayload{newToken='" + newToken + "'}";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.sidecar.common.request.data;

import org.junit.jupiter.api.Test;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import static org.assertj.core.api.Assertions.assertThat;

/**
* Tests for {@link NodeMoveRequestPayload}
*/
public class NodeMoveRequestPayloadTest
{
private final ObjectMapper objectMapper = new ObjectMapper();

@Test
void testJsonSerialization() throws JsonProcessingException
{
NodeMoveRequestPayload payload = new NodeMoveRequestPayload("123456789");
String json = objectMapper.writeValueAsString(payload);
assertThat(json).contains("\"newToken\":\"123456789\"");

NodeMoveRequestPayload deserialized = objectMapper.readValue(json, NodeMoveRequestPayload.class);
assertThat(deserialized.newToken()).isEqualTo("123456789");
}

@Test
void testToString()
{
NodeMoveRequestPayload payload = new NodeMoveRequestPayload("123456789");
assertThat(payload.toString()).contains("123456789");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.cassandra.sidecar.common.request.NativeUpdateRequest;
import org.apache.cassandra.sidecar.common.request.NodeDecommissionRequest;
import org.apache.cassandra.sidecar.common.request.NodeDrainRequest;
import org.apache.cassandra.sidecar.common.request.NodeMoveRequest;
import org.apache.cassandra.sidecar.common.request.NodeSettingsRequest;
import org.apache.cassandra.sidecar.common.request.OperationalJobRequest;
import org.apache.cassandra.sidecar.common.request.ReportSchemaRequest;
Expand Down Expand Up @@ -610,6 +611,18 @@ public Builder nodeDrainRequest()
return request(NODE_DRAIN_REQUEST);
}

/**
* Sets the {@code request} to be a {@link NodeMoveRequest} and returns a reference to this Builder
* enabling method chaining.
*
* @param newToken the new token for the node to move to
* @return a reference to this Builder
*/
public Builder nodeMoveRequest(@NotNull String newToken)
{
return request(new NodeMoveRequest(newToken));
}

/**
* Sets the {@code request} to be a {@link GossipUpdateRequest} for the
* given {@link NodeCommandRequestPayload.State state}, and returns a reference to this Builder enabling method chaining.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,21 @@ public CompletableFuture<OperationalJobResponse> nodeDrain(SidecarInstance insta
.build());
}

/**
* Executes the node move request using the default retry policy and configured selection policy
*
* @param instance the instance where the request will be executed
* @param newToken the new token for the node to move to
* @return a completable future of the operational job response
*/
public CompletableFuture<OperationalJobResponse> nodeMove(SidecarInstance instance, String newToken)
{
return executor.executeRequestAsync(requestBuilder()
.singleInstanceSelectionPolicy(instance)
.nodeMoveRequest(newToken)
.build());
}

/**
* Sends a request to start or stop Cassandra gossiping on the provided instance.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1468,6 +1468,31 @@ public void testNodeDrain() throws Exception
validateResponseServed(ApiEndpointsV1.NODE_DRAIN_ROUTE);
}

@Test
public void testNodeMove() throws Exception
{
UUID jobId = UUID.randomUUID();
String newToken = "123456789";
String nodeMoveString = "{\"jobId\":\"" + jobId + "\",\"jobStatus\":\"SUCCEEDED\"}";

MockResponse response = new MockResponse()
.setResponseCode(OK.code())
.setHeader("content-type", "application/json")
.setBody(nodeMoveString);
enqueue(response);

SidecarInstanceImpl sidecarInstance = RequestExecutorTest.newSidecarInstance(servers.get(0));
OperationalJobResponse result = client.nodeMove(sidecarInstance, newToken).get(30, TimeUnit.SECONDS);
assertThat(result).isNotNull();
assertThat(result.status()).isEqualTo(OperationalJobStatus.SUCCEEDED);
validateResponseServed(ApiEndpointsV1.NODE_MOVE_ROUTE, request -> {
// Verify that the request body contains the expected JSON payload
String requestBody = request.getBody().readUtf8();
assertThat(requestBody).contains("\"newToken\":\"" + newToken + "\"");
assertThat(request.getHeader("Content-Type")).isEqualTo("application/json");
});
}

@Test
void testFailsWithOneAttemptPerServer()
{
Expand Down
Loading