From 5e00d19c5162174de736aa152a08eea9df7abe26 Mon Sep 17 00:00:00 2001 From: Bernardo Botella Corbi Date: Mon, 3 Nov 2025 14:42:10 +0100 Subject: [PATCH 1/2] CassandraInstance support for multiple tokens --- .../sidecar/SidecarCdcCommitLogSegment.java | 2 +- .../SidecarCommitLogProviderTests.java | 24 ++--- .../org/apache/cassandra/cdc/CdcTests.java | 2 +- .../apache/cassandra/cdc/LocalCommitLog.java | 3 +- .../cdc/api/CommitLogMarkerTests.java | 33 +++---- .../cdc/model/CdcKryoSerializationTests.java | 7 +- .../spark/data/model/TokenOwner.java | 8 +- .../data/partitioner/CassandraInstance.java | 32 +++++-- .../spark/data/partitioner/CassandraRing.java | 28 ++++-- .../cassandra/spark/utils/RangeUtils.java | 8 +- .../data/partitioner/CassandraRingTests.java | 91 ++++++++++--------- .../cassandra/spark/utils/RangeUtilsTest.java | 3 +- .../spark/bulkwriter/RingInstance.java | 5 +- .../spark/data/CassandraDataLayer.java | 2 +- .../spark/data/PartitionedDataLayer.java | 8 +- .../data/partitioner/MultipleReplicas.java | 8 +- .../spark/data/partitioner/SingleReplica.java | 4 +- .../spark/KryoSerializationTests.java | 3 +- .../org/apache/cassandra/spark/TestUtils.java | 2 +- .../data/partitioner/SingleReplicaTests.java | 2 +- 20 files changed, 157 insertions(+), 118 deletions(-) diff --git a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcCommitLogSegment.java b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcCommitLogSegment.java index 30662e29f..c39c344c1 100644 --- a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcCommitLogSegment.java +++ b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcCommitLogSegment.java @@ -147,7 +147,7 @@ public String toString() return '{' + "\"node\": \"" + instance.nodeName() + "\"," + "\"dc\": \"" + instance.dataCenter() + "\"," + - "\"token\": \"" + instance.token() + "\"," + + "\"tokens\": \"" + instance.tokens() + "\"," + "\"log\": \"" + segment.name + "\"," + "\"idx\": \"" + segment.idx + "\"," + "\"size\": \"" + segment.size + '"' + diff --git a/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCommitLogProviderTests.java b/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCommitLogProviderTests.java index 83da4f83e..952c2e896 100644 --- a/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCommitLogProviderTests.java +++ b/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCommitLogProviderTests.java @@ -50,18 +50,18 @@ public class SidecarCommitLogProviderTests { private static final List INSTANCES = Arrays.asList( - new CassandraInstance("0", "local1", "DC1"), - new CassandraInstance("100", "local2", "DC1"), - new CassandraInstance("200", "local3", "DC1"), - new CassandraInstance("300", "local4", "DC1"), - new CassandraInstance("400", "local5", "DC1"), - new CassandraInstance("500", "local6", "DC1"), - new CassandraInstance("1", "local7", "DC2"), - new CassandraInstance("101", "local8", "DC2"), - new CassandraInstance("201", "local9", "DC2"), - new CassandraInstance("301", "local10", "DC2"), - new CassandraInstance("401", "local11", "DC2"), - new CassandraInstance("501", "local12", "DC2") + new CassandraInstance(Set.of("0"), "local1", "DC1"), + new CassandraInstance(Set.of("100"), "local2", "DC1"), + new CassandraInstance(Set.of("200"), "local3", "DC1"), + new CassandraInstance(Set.of("300"), "local4", "DC1"), + new CassandraInstance(Set.of("400"), "local5", "DC1"), + new CassandraInstance(Set.of("500"), "local6", "DC1"), + new CassandraInstance(Set.of("1"), "local7", "DC2"), + new CassandraInstance(Set.of("101"), "local8", "DC2"), + new CassandraInstance(Set.of("201"), "local9", "DC2"), + new CassandraInstance(Set.of("301"), "local10", "DC2"), + new CassandraInstance(Set.of("401"), "local11", "DC2"), + new CassandraInstance(Set.of("501"), "local12", "DC2") ); @Test diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CdcTests.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CdcTests.java index 3e2377846..09d154e30 100644 --- a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CdcTests.java +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CdcTests.java @@ -295,7 +295,7 @@ public List loadState(String jobId, int partitionId, @Nullable TokenRa long numSeconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime); assertThat(endState.epoch >= Math.max(0, numSeconds - 4)).isTrue(); // epochs should be around ~ 1 per second assertThat(endState.replicaCount.isEmpty()).isTrue(); - Marker endMarker = endState.markers.startMarker(new CassandraInstance("0", "local-instance", "DC1")); + Marker endMarker = endState.markers.startMarker(new CassandraInstance(Set.of("0"), "local-instance", "DC1")); assertThat(logProvider(directory).logs().map(CommitLog::segmentId).collect(Collectors.toSet()).contains(endMarker.segmentId)).isTrue(); } finally diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/LocalCommitLog.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/LocalCommitLog.java index da7e06f0c..7937d1fce 100644 --- a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/LocalCommitLog.java +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/LocalCommitLog.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.file.Path; +import java.util.Set; import org.apache.cassandra.cdc.api.CommitLog; import org.apache.cassandra.spark.data.FileSystemSource; @@ -41,7 +42,7 @@ public LocalCommitLog(Path path) this.name = path.getFileName().toString(); this.path = path; this.length = IOUtils.size(path); - this.instance = new CassandraInstance("0", "local-instance", "DC1"); + this.instance = new CassandraInstance(Set.of("0"), "local-instance", "DC1"); } public String name() diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/api/CommitLogMarkerTests.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/api/CommitLogMarkerTests.java index ae80aa484..b5fd7452a 100644 --- a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/api/CommitLogMarkerTests.java +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/api/CommitLogMarkerTests.java @@ -26,6 +26,7 @@ import java.io.ObjectOutputStream; import java.io.Serializable; import java.math.BigInteger; +import java.util.Set; import com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Test; @@ -42,7 +43,7 @@ public class CommitLogMarkerTests @Test public void testEmpty() { - CassandraInstance inst = new CassandraInstance("0", "local1-i1", "DC1"); + CassandraInstance inst = new CassandraInstance(Set.of("0"), "local1-i1", "DC1"); Marker marker = CommitLogMarkers.EMPTY.startMarker(inst); assertThat(marker.segmentId).isEqualTo(0); assertThat(marker.position).isEqualTo(0); @@ -54,9 +55,9 @@ public void testEmpty() @Test public void testPerInstance() { - CassandraInstance inst1 = new CassandraInstance("0", "local1-i1", "DC1"); - CassandraInstance inst2 = new CassandraInstance("1", "local2-i1", "DC1"); - CassandraInstance inst3 = new CassandraInstance("2", "local3-i1", "DC1"); + CassandraInstance inst1 = new CassandraInstance(Set.of("0"), "local1-i1", "DC1"); + CassandraInstance inst2 = new CassandraInstance(Set.of("1"), "local2-i1", "DC1"); + CassandraInstance inst3 = new CassandraInstance(Set.of("2"), "local3-i1", "DC1"); CommitLogMarkers markers = CommitLogMarkers.of( ImmutableMap.of( @@ -77,9 +78,9 @@ public void testPerInstance() @Test public void testPerRange() { - CassandraInstance inst1 = new CassandraInstance("0", "local1-i1", "DC1"); - CassandraInstance inst2 = new CassandraInstance("1", "local2-i1", "DC1"); - CassandraInstance inst3 = new CassandraInstance("2", "local3-i1", "DC1"); + CassandraInstance inst1 = new CassandraInstance(Set.of("0"), "local1-i1", "DC1"); + CassandraInstance inst2 = new CassandraInstance(Set.of("1"), "local2-i1", "DC1"); + CassandraInstance inst3 = new CassandraInstance(Set.of("2"), "local3-i1", "DC1"); // build per range commit log markers PerRangeCommitLogMarkers.PerRangeBuilder builder = CommitLogMarkers.perRangeBuilder(); @@ -126,7 +127,7 @@ public void testPerRange() @Test public void testIsBefore() { - CassandraInstance inst1 = new CassandraInstance("0", "local1-i1", "DC1"); + CassandraInstance inst1 = new CassandraInstance(Set.of("0"), "local1-i1", "DC1"); assertThat(inst1.zeroMarker().isBefore(inst1.zeroMarker())).isFalse(); @@ -146,8 +147,8 @@ public void testIsBefore() public void testIsBeforeException() { assertThatThrownBy(() -> { - CassandraInstance inst1 = new CassandraInstance("0", "local1-i1", "DC1"); - CassandraInstance inst2 = new CassandraInstance("1", "local2-i1", "DC1"); + CassandraInstance inst1 = new CassandraInstance(Set.of("0"), "local1-i1", "DC1"); + CassandraInstance inst2 = new CassandraInstance(Set.of("1"), "local2-i1", "DC1"); inst1.zeroMarker().isBefore(inst2.zeroMarker()); }).isInstanceOf(IllegalArgumentException.class); } @@ -155,9 +156,9 @@ public void testIsBeforeException() @Test public void testPerInstanceJdkSerialization() { - CassandraInstance inst1 = new CassandraInstance("0", "local1-i1", "DC1"); - CassandraInstance inst2 = new CassandraInstance("1", "local2-i1", "DC1"); - CassandraInstance inst3 = new CassandraInstance("2", "local3-i1", "DC1"); + CassandraInstance inst1 = new CassandraInstance(Set.of("0"), "local1-i1", "DC1"); + CassandraInstance inst2 = new CassandraInstance(Set.of("1"), "local2-i1", "DC1"); + CassandraInstance inst3 = new CassandraInstance(Set.of("2"), "local3-i1", "DC1"); CommitLogMarkers markers = CommitLogMarkers.of( ImmutableMap.of( @@ -179,9 +180,9 @@ public void testPerInstanceJdkSerialization() @Test public void testPerRangeJdkSerialization() { - CassandraInstance inst1 = new CassandraInstance("0", "local1-i1", "DC1"); - CassandraInstance inst2 = new CassandraInstance("1", "local2-i1", "DC1"); - CassandraInstance inst3 = new CassandraInstance("2", "local3-i1", "DC1"); + CassandraInstance inst1 = new CassandraInstance(Set.of("0"), "local1-i1", "DC1"); + CassandraInstance inst2 = new CassandraInstance(Set.of("1"), "local2-i1", "DC1"); + CassandraInstance inst3 = new CassandraInstance(Set.of("2"), "local3-i1", "DC1"); PerRangeCommitLogMarkers.PerRangeBuilder builder = CommitLogMarkers.perRangeBuilder(); builder.add(TokenRange.closed(BigInteger.ZERO, BigInteger.valueOf(5000)), inst1.markerAt(500, 10000)); diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/model/CdcKryoSerializationTests.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/model/CdcKryoSerializationTests.java index 66c076cbf..29ea37eab 100644 --- a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/model/CdcKryoSerializationTests.java +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/model/CdcKryoSerializationTests.java @@ -20,6 +20,7 @@ package org.apache.cassandra.cdc.model; import java.math.BigInteger; +import java.util.Set; import com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Test; @@ -39,9 +40,9 @@ public class CdcKryoSerializationTests { - public static final CassandraInstance INST_1 = new CassandraInstance("0", "local1-i1", "DC1"); - public static final CassandraInstance INST_2 = new CassandraInstance("1", "local2-i1", "DC1"); - public static final CassandraInstance INST_3 = new CassandraInstance("2", "local3-i1", "DC1"); + public static final CassandraInstance INST_1 = new CassandraInstance(Set.of("0"), "local1-i1", "DC1"); + public static final CassandraInstance INST_2 = new CassandraInstance(Set.of("1"), "local2-i1", "DC1"); + public static final CassandraInstance INST_3 = new CassandraInstance(Set.of("2"), "local3-i1", "DC1"); public static final PartitionUpdateWrapper.Digest DIGEST_1 = new PartitionUpdateWrapper.Digest("ks1", "tb1", diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/model/TokenOwner.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/model/TokenOwner.java index f9119b7ee..f0cf0fdd5 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/model/TokenOwner.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/model/TokenOwner.java @@ -19,13 +19,15 @@ package org.apache.cassandra.spark.data.model; +import java.util.Set; + /** - * Token owner owns a token + * Token owner owns a set of tokens */ public interface TokenOwner { /** - * @return the token it owns + * @return the tokens it owns */ - String token(); + Set tokens(); } diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraInstance.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraInstance.java index 85b5ebdb6..752aa9714 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraInstance.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraInstance.java @@ -20,7 +20,9 @@ package org.apache.cassandra.spark.data.partitioner; import java.io.Serializable; +import java.util.HashSet; import java.util.Objects; +import java.util.Set; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; @@ -34,20 +36,20 @@ public class CassandraInstance implements TokenOwner, Serializable public static final CassandraInstance.Serializer SERIALIZER = new CassandraInstance.Serializer(); private static final long serialVersionUID = 6767636627576239773L; - private final String token; + private final Set tokens; private final String node; private final String dataCenter; - public CassandraInstance(String token, String node, String dataCenter) + public CassandraInstance(Set tokens, String node, String dataCenter) { - this.token = token; + this.tokens = tokens; this.node = node; this.dataCenter = dataCenter; } - public String token() + public Set tokens() { - return token; + return tokens; } public String nodeName() @@ -83,7 +85,7 @@ public boolean equals(Object other) } CassandraInstance that = (CassandraInstance) other; - return Objects.equals(this.token, that.token) + return Objects.equals(this.tokens, that.tokens) && Objects.equals(this.node, that.node) && Objects.equals(this.dataCenter, that.dataCenter); } @@ -91,13 +93,13 @@ public boolean equals(Object other) @Override public int hashCode() { - return Objects.hash(token, node, dataCenter); + return Objects.hash(tokens, node, dataCenter); } @Override public String toString() { - return String.format("{\"token\"=\"%s\", \"node\"=\"%s\", \"dc\"=\"%s\"}", token, node, dataCenter); + return String.format("{\"tokens\"=\"%s\", \"node\"=\"%s\", \"dc\"=\"%s\"}", tokens, node, dataCenter); } public static class Serializer extends com.esotericsoftware.kryo.Serializer @@ -105,13 +107,23 @@ public static class Serializer extends com.esotericsoftware.kryo.Serializer tokens = new HashSet<>(); + int numTokens = in.readInt(); + for (int i = 0; i < numTokens; i++) + { + tokens.add(in.readString()); + } + return new CassandraInstance(tokens, in.readString(), in.readString()); } @Override public void write(Kryo kryo, Output out, CassandraInstance instance) { - out.writeString(instance.token()); + out.writeInt(instance.tokens().size()); + for (String token : instance.tokens()) + { + out.writeString(token); + } out.writeString(instance.nodeName()); out.writeString(instance.dataCenter()); } diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java index 3d142f8df..7cae49855 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java @@ -29,8 +29,10 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import java.util.Objects; @@ -115,7 +117,9 @@ public CassandraRing(Partitioner partitioner, this.keyspace = keyspace; this.replicationFactor = replicationFactor; this.instances = instances.stream() - .sorted(Comparator.comparing(instance -> new BigInteger(instance.token()))) + .sorted(Comparator.comparing(i -> i.tokens().stream() + .map(BigInteger::new).collect(Collectors.toList()) + .stream().min(BigInteger::compareTo).get())) .collect(Collectors.toCollection(ArrayList::new)); this.init(); } @@ -209,8 +213,8 @@ private Collection dataCenters() public Collection tokens() { return instances.stream() - .map(CassandraInstance::token) - .map(BigInteger::new) + .map(i -> i.tokens().stream().map(BigInteger::new).collect(Collectors.toList())) + .flatMap(List::stream) .sorted() .collect(Collectors.toList()); } @@ -221,8 +225,8 @@ public Collection tokens(String dataCenter) "Datacenter tokens doesn't make sense for SimpleStrategy"); return instances.stream() .filter(instance -> instance.dataCenter().matches(dataCenter)) - .map(CassandraInstance::token) - .map(BigInteger::new) + .map(i -> i.tokens().stream().map(BigInteger::new).collect(Collectors.toList())) + .flatMap(List::stream) .collect(Collectors.toList()); } @@ -276,7 +280,13 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE this.instances = new ArrayList<>(numInstances); for (int instance = 0; instance < numInstances; instance++) { - this.instances.add(new CassandraInstance(in.readUTF(), in.readUTF(), in.readUTF())); + Set tokens = new HashSet<>(); + int numTokens = in.readShort(); + for (int i = 0; i < numTokens; i++) + { + tokens.add(in.readUTF()); + } + this.instances.add(new CassandraInstance(tokens, in.readUTF(), in.readUTF())); } this.init(); } @@ -299,7 +309,11 @@ private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFou out.writeShort(this.instances.size()); for (CassandraInstance instance : this.instances) { - out.writeUTF(instance.token()); + out.writeShort(instance.tokens().size()); + for (String token : instance.tokens()) + { + out.writeUTF(token); + } out.writeUTF(instance.nodeName()); out.writeUTF(instance.dataCenter()); } diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/RangeUtils.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/RangeUtils.java index 67912d36c..1671e5e56 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/RangeUtils.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/RangeUtils.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Set; import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; @@ -156,8 +157,11 @@ public static List> split(Range range, int nrSplit { Instance instance = instances.get(index); int disjointReplica = ((instances.size() + index) - replicationFactor) % instances.size(); - BigInteger rangeStart = new BigInteger(instances.get(disjointReplica).token()); - BigInteger rangeEnd = new BigInteger(instance.token()); + Set tokensStart = instances.get(disjointReplica).tokens(); + Set tokensEnd = instance.tokens(); + + BigInteger rangeStart = tokensStart.stream().map(BigInteger::new).min(BigInteger::compareTo).get(); + BigInteger rangeEnd = tokensEnd.stream().map(BigInteger::new).max(BigInteger::compareTo).get(); // If start token is greater than or equal to end token we are looking at a wrap around range, split it if (rangeStart.compareTo(rangeEnd) >= 0) diff --git a/cassandra-analytics-common/src/test/java/org/apache/cassandra/spark/data/partitioner/CassandraRingTests.java b/cassandra-analytics-common/src/test/java/org/apache/cassandra/spark/data/partitioner/CassandraRingTests.java index c66290f0f..37f6a186d 100644 --- a/cassandra-analytics-common/src/test/java/org/apache/cassandra/spark/data/partitioner/CassandraRingTests.java +++ b/cassandra-analytics-common/src/test/java/org/apache/cassandra/spark/data/partitioner/CassandraRingTests.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Set; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Multimap; @@ -70,9 +71,9 @@ private void validateRanges(Collection> ranges, @Test public void testSimpleStrategyRF3() { - List instances = Arrays.asList(new CassandraInstance("0", "local0-i1", "DEV"), - new CassandraInstance("100", "local0-i2", "DEV"), - new CassandraInstance("200", "local0-i3", "DEV")); + List instances = Arrays.asList(new CassandraInstance(Set.of("0"), "local0-i1", "DEV"), + new CassandraInstance(Set.of("100"), "local0-i2", "DEV"), + new CassandraInstance(Set.of("200"), "local0-i3", "DEV")); CassandraRing ring = new CassandraRing(Partitioner.Murmur3Partitioner, "test", new ReplicationFactor(ImmutableMap.of( @@ -96,9 +97,9 @@ public void testSimpleStrategyRF3() @Test public void testSimpleStrategyRF1() { - List instances = Arrays.asList(new CassandraInstance("0", "local0-i1", "DEV"), - new CassandraInstance("100", "local0-i2", "DEV"), - new CassandraInstance("200", "local0-i3", "DEV")); + List instances = Arrays.asList(new CassandraInstance(Set.of("0"), "local0-i1", "DEV"), + new CassandraInstance(Set.of("100"), "local0-i2", "DEV"), + new CassandraInstance(Set.of("200"), "local0-i3", "DEV")); CassandraRing ring = new CassandraRing(Partitioner.Murmur3Partitioner, "test", new ReplicationFactor(ImmutableMap.of( @@ -148,9 +149,9 @@ public void testSimpleStrategyRF1() @Test public void testSimpleStrategyRF2() { - List instances = Arrays.asList(new CassandraInstance("0", "local0-i1", "DEV"), - new CassandraInstance("100", "local0-i2", "DEV"), - new CassandraInstance("200", "local0-i3", "DEV")); + List instances = Arrays.asList(new CassandraInstance(Set.of("0"), "local0-i1", "DEV"), + new CassandraInstance(Set.of("100"), "local0-i2", "DEV"), + new CassandraInstance(Set.of("200"), "local0-i3", "DEV")); CassandraRing ring = new CassandraRing(Partitioner.Murmur3Partitioner, "test", new ReplicationFactor(ImmutableMap.of( @@ -198,12 +199,12 @@ public void testSimpleStrategyRF2() @Test public void testNetworkStrategyRF33() { - List instances = Arrays.asList(new CassandraInstance("0", "local0-i1", "DC1"), - new CassandraInstance("100", "local0-i2", "DC1"), - new CassandraInstance("200", "local0-i3", "DC1"), - new CassandraInstance("1", "local1-i1", "DC2"), - new CassandraInstance("101", "local1-i2", "DC2"), - new CassandraInstance("201", "local1-i3", "DC2")); + List instances = Arrays.asList(new CassandraInstance(Set.of("0"), "local0-i1", "DC1"), + new CassandraInstance(Set.of("100"), "local0-i2", "DC1"), + new CassandraInstance(Set.of("200"), "local0-i3", "DC1"), + new CassandraInstance(Set.of("1"), "local1-i1", "DC2"), + new CassandraInstance(Set.of("101"), "local1-i2", "DC2"), + new CassandraInstance(Set.of("201"), "local1-i3", "DC2")); CassandraRing ring = new CassandraRing( Partitioner.Murmur3Partitioner, @@ -211,12 +212,12 @@ public void testNetworkStrategyRF33() new ReplicationFactor(ImmutableMap.of("class", "org.apache.cassandra.locator.NetworkTopologyStrategy", "DC1", "3", "DC2", "3")), - Arrays.asList(new CassandraInstance("0", "local0-i1", "DC1"), - new CassandraInstance("100", "local0-i2", "DC1"), - new CassandraInstance("200", "local0-i3", "DC1"), - new CassandraInstance("1", "local1-i1", "DC2"), - new CassandraInstance("101", "local1-i2", "DC2"), - new CassandraInstance("201", "local1-i3", "DC2"))); + Arrays.asList(new CassandraInstance(Set.of("0"), "local0-i1", "DC1"), + new CassandraInstance(Set.of("100"), "local0-i2", "DC1"), + new CassandraInstance(Set.of("200"), "local0-i3", "DC1"), + new CassandraInstance(Set.of("1"), "local1-i1", "DC2"), + new CassandraInstance(Set.of("101"), "local1-i2", "DC2"), + new CassandraInstance(Set.of("201"), "local1-i3", "DC2"))); assertThat(ring.tokens().toArray()).isEqualTo(Arrays.asList(BigInteger.valueOf(0L), BigInteger.valueOf(1L), @@ -245,12 +246,12 @@ public void testNetworkStrategyRF33() @Test public void testNetworkStrategyRF11() { - List instances = Arrays.asList(new CassandraInstance("0", "local0-i1", "DC1"), - new CassandraInstance("100", "local0-i2", "DC1"), - new CassandraInstance("200", "local0-i3", "DC1"), - new CassandraInstance("1", "local1-i1", "DC2"), - new CassandraInstance("101", "local1-i2", "DC2"), - new CassandraInstance("201", "local1-i3", "DC2")); + List instances = Arrays.asList(new CassandraInstance(Set.of("0"), "local0-i1", "DC1"), + new CassandraInstance(Set.of("100"), "local0-i2", "DC1"), + new CassandraInstance(Set.of("200"), "local0-i3", "DC1"), + new CassandraInstance(Set.of("1"), "local1-i1", "DC2"), + new CassandraInstance(Set.of("101"), "local1-i2", "DC2"), + new CassandraInstance(Set.of("201"), "local1-i3", "DC2")); CassandraRing ring = new CassandraRing( Partitioner.Murmur3Partitioner, @@ -258,12 +259,12 @@ public void testNetworkStrategyRF11() new ReplicationFactor(ImmutableMap.of("class", "org.apache.cassandra.locator.NetworkTopologyStrategy", "DC1", "1", "DC2", "1")), - Arrays.asList(new CassandraInstance("0", "local0-i1", "DC1"), - new CassandraInstance("100", "local0-i2", "DC1"), - new CassandraInstance("200", "local0-i3", "DC1"), - new CassandraInstance("1", "local1-i1", "DC2"), - new CassandraInstance("101", "local1-i2", "DC2"), - new CassandraInstance("201", "local1-i3", "DC2"))); + Arrays.asList(new CassandraInstance(Set.of("0"), "local0-i1", "DC1"), + new CassandraInstance(Set.of("100"), "local0-i2", "DC1"), + new CassandraInstance(Set.of("200"), "local0-i3", "DC1"), + new CassandraInstance(Set.of("1"), "local1-i1", "DC2"), + new CassandraInstance(Set.of("101"), "local1-i2", "DC2"), + new CassandraInstance(Set.of("201"), "local1-i3", "DC2"))); assertThat(ring.tokens().toArray()).isEqualTo(Arrays.asList(BigInteger.valueOf(0L), BigInteger.valueOf(1L), @@ -350,12 +351,12 @@ public void testNetworkStrategyRF11() @Test public void testNetworkStrategyRF22() { - List instances = Arrays.asList(new CassandraInstance("0", "local0-i1", "DC1"), - new CassandraInstance("100", "local0-i2", "DC1"), - new CassandraInstance("200", "local0-i3", "DC1"), - new CassandraInstance("1", "local1-i1", "DC2"), - new CassandraInstance("101", "local1-i2", "DC2"), - new CassandraInstance("201", "local1-i3", "DC2")); + List instances = Arrays.asList(new CassandraInstance(Set.of("0"), "local0-i1", "DC1"), + new CassandraInstance(Set.of("100"), "local0-i2", "DC1"), + new CassandraInstance(Set.of("200"), "local0-i3", "DC1"), + new CassandraInstance(Set.of("1"), "local1-i1", "DC2"), + new CassandraInstance(Set.of("101"), "local1-i2", "DC2"), + new CassandraInstance(Set.of("201"), "local1-i3", "DC2")); CassandraRing ring = new CassandraRing( Partitioner.Murmur3Partitioner, @@ -363,12 +364,12 @@ public void testNetworkStrategyRF22() new ReplicationFactor(ImmutableMap.of("class", "org.apache.cassandra.locator.NetworkTopologyStrategy", "DC1", "2", "DC2", "2")), - Arrays.asList(new CassandraInstance("0", "local0-i1", "DC1"), - new CassandraInstance("100", "local0-i2", "DC1"), - new CassandraInstance("200", "local0-i3", "DC1"), - new CassandraInstance("1", "local1-i1", "DC2"), - new CassandraInstance("101", "local1-i2", "DC2"), - new CassandraInstance("201", "local1-i3", "DC2"))); + Arrays.asList(new CassandraInstance(Set.of("0"), "local0-i1", "DC1"), + new CassandraInstance(Set.of("100"), "local0-i2", "DC1"), + new CassandraInstance(Set.of("200"), "local0-i3", "DC1"), + new CassandraInstance(Set.of("1"), "local1-i1", "DC2"), + new CassandraInstance(Set.of("101"), "local1-i2", "DC2"), + new CassandraInstance(Set.of("201"), "local1-i3", "DC2"))); assertThat(ring.tokens().toArray()).isEqualTo(Arrays.asList(BigInteger.valueOf(0L), BigInteger.valueOf(1L), diff --git a/cassandra-analytics-common/src/test/java/org/apache/cassandra/spark/utils/RangeUtilsTest.java b/cassandra-analytics-common/src/test/java/org/apache/cassandra/spark/utils/RangeUtilsTest.java index 772eef521..f2e58cb83 100644 --- a/cassandra-analytics-common/src/test/java/org/apache/cassandra/spark/utils/RangeUtilsTest.java +++ b/cassandra-analytics-common/src/test/java/org/apache/cassandra/spark/utils/RangeUtilsTest.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -283,7 +284,7 @@ private static List getInstances(BigInteger[] tokens) List instances = new ArrayList<>(); for (int token = 0; token < tokens.length; token++) { - instances.add(new CassandraInstance(tokens[token].toString(), "node-" + token, "dc")); + instances.add(new CassandraInstance(Set.of(tokens[token].toString()), "node-" + token, "dc")); } return instances; } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java index ea8231a02..a70b21e9c 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java @@ -24,6 +24,7 @@ import java.io.ObjectOutputStream; import java.io.Serializable; import java.util.Objects; +import java.util.Set; import com.google.common.annotations.VisibleForTesting; @@ -74,9 +75,9 @@ public RingInstance(ReplicaMetadata replica) // Used only in tests @Override - public String token() + public Set tokens() { - return ringEntry.token(); + return Set.of(ringEntry.token()); } @Override diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java index 67b06dd6e..8c7377fc2 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java @@ -662,7 +662,7 @@ public CassandraRing createCassandraRingFromRing(Partitioner partitioner, Collection instances = ring .stream() .filter(status -> datacenter == null || datacenter.equalsIgnoreCase(status.datacenter())) - .map(status -> new CassandraInstance(status.token(), status.fqdn(), status.datacenter())) + .map(status -> new CassandraInstance(Set.of(status.token()), status.fqdn(), status.datacenter())) .collect(Collectors.toList()); return new CassandraRing(partitioner, keyspace, replicationFactor, instances); } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/PartitionedDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/PartitionedDataLayer.java index d3200166d..3b8297155 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/PartitionedDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/PartitionedDataLayer.java @@ -452,8 +452,8 @@ public ReplicaSet add(CassandraInstance instance) { if (primary.size() < minReplicas) { - LOGGER.info("Selecting instance as primary replica nodeName={} token={} dc={} partitionId={}", - instance.nodeName(), instance.token(), instance.dataCenter(), partitionId); + LOGGER.info("Selecting instance as primary replica nodeName={} tokens={} dc={} partitionId={}", + instance.nodeName(), instance.tokens(), instance.dataCenter(), partitionId); return addPrimary(instance); } return addBackup(instance); @@ -487,8 +487,8 @@ public Set backup() public ReplicaSet addBackup(CassandraInstance instance) { - LOGGER.info("Selecting instance as backup replica nodeName={} token={} dc={} partitionId={}", - instance.nodeName(), instance.token(), instance.dataCenter(), partitionId); + LOGGER.info("Selecting instance as backup replica nodeName={} tokens={} dc={} partitionId={}", + instance.nodeName(), instance.tokens(), instance.dataCenter(), partitionId); backup.add(instance); return this; } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/partitioner/MultipleReplicas.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/partitioner/MultipleReplicas.java index 776e824f1..20598749f 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/partitioner/MultipleReplicas.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/partitioner/MultipleReplicas.java @@ -118,14 +118,14 @@ private void openReplicaOrRetry( .whenComplete((readers, throwable) -> { if (throwable != null) { - LOGGER.warn("Failed to open SSTableReaders for replica node={} token={} dataCenter={}", - replica.instance().nodeName(), replica.instance().token(), replica.instance().dataCenter(), throwable); + LOGGER.warn("Failed to open SSTableReaders for replica node={} tokens={} dataCenter={}", + replica.instance().nodeName(), replica.instance().tokens(), replica.instance().dataCenter(), throwable); stats.failedToOpenReplica(replica, throwable); SingleReplica anotherReplica = otherReplicas.poll(); if (anotherReplica != null) { - LOGGER.warn("Retrying on another replica node={} token={} dataCenter={}", - anotherReplica.instance().nodeName(), anotherReplica.instance().token(), anotherReplica.instance().dataCenter()); + LOGGER.warn("Retrying on another replica node={} tokens={} dataCenter={}", + anotherReplica.instance().nodeName(), anotherReplica.instance().tokens(), anotherReplica.instance().dataCenter()); // If the failed replica was the repair primary we need the backup replacement replica to be the new repair primary anotherReplica.setIsRepairPrimary(replica.isRepairPrimary()); openReplicaOrRetry(anotherReplica, readerOpener, result, count, latch, otherReplicas); diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/partitioner/SingleReplica.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/partitioner/SingleReplica.java index 1f7f8e736..891fec5d9 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/partitioner/SingleReplica.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/partitioner/SingleReplica.java @@ -143,8 +143,8 @@ CompletableFuture> openReplicaAsync(@NotNu } catch (Throwable throwable) { - LOGGER.warn("Unexpected error attempting to open SSTable readers for replica node={} token={} dataCenter={}", - instance().nodeName(), instance().token(), instance().dataCenter(), throwable); + LOGGER.warn("Unexpected error attempting to open SSTable readers for replica node={} tokens={} dataCenter={}", + instance().nodeName(), instance().tokens(), instance().dataCenter(), throwable); CompletableFuture> exceptionally = new CompletableFuture<>(); exceptionally.completeExceptionally(throwable); return exceptionally; diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/KryoSerializationTests.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/KryoSerializationTests.java index 62e76774b..3b970dca3 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/KryoSerializationTests.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/KryoSerializationTests.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import com.google.common.collect.ImmutableList; @@ -273,7 +274,7 @@ public void testCqlTable(CassandraBridge bridge) @MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges") public void testCassandraInstance(CassandraBridge bridge) { - CassandraInstance instance = new CassandraInstance("-9223372036854775807", "local1-i1", "DC1"); + CassandraInstance instance = new CassandraInstance(Set.of("-9223372036854775807"), "local1-i1", "DC1"); Output out = serialize(bridge.getVersion(), instance); CassandraInstance deserialized = deserialize(bridge.getVersion(), out, CassandraInstance.class); assertThat(deserialized).isNotNull(); diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java index 99f05f8ea..c0339530b 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java @@ -346,7 +346,7 @@ public static Collection createInstances(Partitioner partitio BigInteger token = partitioner.minToken(); for (int instance = 0; instance < numInstances; instance++) { - instances.add(new CassandraInstance(token.toString(), "local-i" + instance, dataCenter)); + instances.add(new CassandraInstance(Set.of(token.toString()), "local-i" + instance, dataCenter)); token = token.add(split); assertThat(token.compareTo(partitioner.maxToken())).isLessThanOrEqualTo(0); } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/SingleReplicaTests.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/SingleReplicaTests.java index acaa4de24..f9a1f2858 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/SingleReplicaTests.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/SingleReplicaTests.java @@ -181,7 +181,7 @@ private static void runTest( FileType... missingFileTypes) throws InterruptedException, IOException, ExecutionException { PartitionedDataLayer dataLayer = mock(PartitionedDataLayer.class); - CassandraInstance instance = new CassandraInstance("-9223372036854775808", "local1-i1", "DC1"); + CassandraInstance instance = new CassandraInstance(Set.of("-9223372036854775808"), "local1-i1", "DC1"); SSTable ssTable1 = mockSSTable(); SSTable ssTable2 = mockSSTable(); From 88cb4dd3a36ca40d57053bb7f61fcae0354b28dd Mon Sep 17 00:00:00 2001 From: Bernardo Botella Corbi Date: Mon, 3 Nov 2025 17:32:07 +0100 Subject: [PATCH 2/2] Changes.txt --- CHANGES.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.txt b/CHANGES.txt index 28cd7e3f3..1be59d491 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 0.2.0 ----- + * CassandraInstance support for multiple tokens (CASSANALYTICS-100) * Expose SidecarCdc builders and interfaces (CASSANALYTICS-94) * Fix bulk reader node availability comparator ordering (CASSANALYTICS-99) * Remove not needed buffer flips (CASSANALYTICS-95)