diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java index 3d142f8df..f1b1e0d64 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java @@ -31,9 +31,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.OptionalInt; import java.util.stream.Collectors; +import java.util.stream.IntStream; -import java.util.Objects; import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; @@ -73,6 +75,7 @@ public class CassandraRing implements Serializable private String keyspace; private ReplicationFactor replicationFactor; private List instances; + private Map, List> rangeToReplicas; private transient RangeMap> replicas; private transient Multimap> tokenRangeMap; @@ -106,6 +109,49 @@ private static void addReplica(CassandraInstance replica, replicaMap.putAll(mappingsToAdd); } + /** + * Find the CassandraInstance which owns a given token from a list of + * __sorted__ CassandraInstances. If the token is greater than or equal to + * the partitioner max token then the CassandraInstance with the smallest + * token value is returned, assuming token ring wraparound. + */ + private static int getTokenOwnerIdx(List instances, Partitioner partitioner, BigInteger token) + { + OptionalInt maybeTokenOwnerIdx = IntStream.range(0, instances.size()) + .filter(idx -> new BigInteger(instances.get(idx).token()).equals(token)) + .findFirst(); + + if (maybeTokenOwnerIdx.isPresent()) + { + return maybeTokenOwnerIdx.getAsInt(); + } + else if (token.compareTo(partitioner.maxToken()) >= 0) + { + return 0; + } + else + { + throw new RuntimeException(String.format("Could not find instance for token: %s", token)); + } + } + + public CassandraRing(Partitioner partitioner, + String keyspace, + ReplicationFactor replicationFactor, + Collection instances, + Map, List> rangeToReplicas) + { + this.partitioner = partitioner; + this.keyspace = keyspace; + this.replicationFactor = replicationFactor; + this.instances = instances.stream() + .sorted(Comparator.comparing(instance -> new BigInteger(instance.token()))) + .collect(Collectors.toCollection(ArrayList::new)); + this.rangeToReplicas = rangeToReplicas; + + this.initWithRangeToReplicas(); + } + public CassandraRing(Partitioner partitioner, String keyspace, ReplicationFactor replicationFactor, @@ -120,6 +166,32 @@ public CassandraRing(Partitioner partitioner, this.init(); } + private void initWithRangeToReplicas() + { + // replicas is a mapping of token ranges to CassandraInstances + // which are replicas for that range + replicas = TreeRangeMap.create(); + + // tokenRangeMap is mapping of CassandraInstances to token ranges + // that they own or are a replica for + tokenRangeMap = ArrayListMultimap.create(); + + rangeToReplicas.forEach((range, rangeReplicas) -> { + int tokenOwnerIdx = getTokenOwnerIdx(this.instances, partitioner, range.upperEndpoint()); + + // Find following closest CassandraInstance for each replica and update tokenRangeMap + rangeReplicas.forEach(replica -> + IntStream.range(0, this.instances.size()) + .map(idx -> (idx + tokenOwnerIdx) % this.instances.size()) + .filter(idx -> this.instances.get(idx).nodeName().equals(replica)) + .findFirst() + .ifPresent(idx -> tokenRangeMap.get(this.instances.get(idx)).add(range))); + }); + + replicas.put(Range.openClosed(partitioner.minToken(), partitioner.maxToken()), Collections.emptyList()); + tokenRangeMap.asMap().forEach((instance, ranges) -> ranges.forEach(range -> addReplica(instance, range, replicas))); + } + private void init() { // Setup token range map @@ -278,7 +350,25 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE { this.instances.add(new CassandraInstance(in.readUTF(), in.readUTF(), in.readUTF())); } - this.init(); + + this.rangeToReplicas = new HashMap<>(); + int numKeys = in.readShort(); + for (int key = 0; key < numKeys; key++) + { + BigInteger rangeStart = new BigInteger(in.readUTF()); + BigInteger rangeEnd = new BigInteger(in.readUTF()); + List replicas = new ArrayList<>(); + + int numReplicas = in.readShort(); + for (int replica = 0; replica < numReplicas; replica++) + { + replicas.add(in.readUTF()); + } + + this.rangeToReplicas.put(Range.openClosed(rangeStart, rangeEnd), replicas); + } + + this.initWithRangeToReplicas(); } private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFoundException @@ -303,6 +393,19 @@ private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFou out.writeUTF(instance.nodeName()); out.writeUTF(instance.dataCenter()); } + + out.writeShort(this.rangeToReplicas.size()); + for (Map.Entry, List> entry : rangeToReplicas.entrySet()) + { + out.writeUTF(entry.getKey().lowerEndpoint().toString()); + out.writeUTF(entry.getKey().upperEndpoint().toString()); + + out.writeShort(entry.getValue().size()); + for (String replica : entry.getValue()) + { + out.writeUTF(replica); + } + } } public static class Serializer extends com.esotericsoftware.kryo.Serializer @@ -314,6 +417,7 @@ public void write(Kryo kryo, Output out, CassandraRing ring) out.writeString(ring.keyspace); kryo.writeObject(out, ring.replicationFactor); kryo.writeObject(out, ring.instances); + kryo.writeObject(out, ring.rangeToReplicas); } @Override @@ -324,7 +428,8 @@ public CassandraRing read(Kryo kryo, Input in, Class type) : Partitioner.Murmur3Partitioner, in.readString(), kryo.readObject(in, ReplicationFactor.class), - kryo.readObject(in, ArrayList.class)); + kryo.readObject(in, ArrayList.class), + kryo.readObject(in, Map.class)); } } } diff --git a/cassandra-analytics-common/src/test/java/org/apache/cassandra/spark/data/partitioner/CassandraRingTests.java b/cassandra-analytics-common/src/test/java/org/apache/cassandra/spark/data/partitioner/CassandraRingTests.java index c66290f0f..10da8a5d5 100644 --- a/cassandra-analytics-common/src/test/java/org/apache/cassandra/spark/data/partitioner/CassandraRingTests.java +++ b/cassandra-analytics-common/src/test/java/org/apache/cassandra/spark/data/partitioner/CassandraRingTests.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Map; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Multimap; @@ -67,6 +68,12 @@ private void validateRanges(Collection> ranges, excludedTokens.forEach(token -> assertThat(rangeSet.contains(token)).isFalse()); } + private Map.Entry, List> replicaEntry(String startToken, String endToken, List replicas) + { + Range range = Range.openClosed(new BigInteger(startToken), new BigInteger(endToken)); + return Map.entry(range, replicas); + } + @Test public void testSimpleStrategyRF3() { @@ -447,4 +454,164 @@ public void testNetworkStrategyRF22() Partitioner.Murmur3Partitioner.minToken(), Partitioner.Murmur3Partitioner.maxToken())); } + + @Test + public void testNetworkStrategyRF3Tokens1WithRangeToReplicas() + { + List instances = Arrays.asList( + new CassandraInstance("0", "local0-i1", "DC1"), + new CassandraInstance("100", "local0-i2", "DC1"), + new CassandraInstance(Partitioner.Murmur3Partitioner.maxToken().toString(), "local0-i3", "DC1")); + + List replicas = List.of("local0-i1", "local0-i2", "local0-i3"); + Map, List> rangeToReplicas = Map.ofEntries( + replicaEntry(Partitioner.Murmur3Partitioner.minToken().toString(), "0", replicas), + replicaEntry("0", "100", replicas), + replicaEntry("100", Partitioner.Murmur3Partitioner.maxToken().toString(), replicas) + ); + + CassandraRing ring = new CassandraRing( + Partitioner.Murmur3Partitioner, + "test", + new ReplicationFactor( + ImmutableMap.of("class", "org.apache.cassandra.locator.NetworkTopologyStrategy", "DC1", "3")), + instances, + rangeToReplicas); + + assertThat(ring.tokens().toArray()).isEqualTo(Arrays.asList( + BigInteger.valueOf(0L), + BigInteger.valueOf(100L), + Partitioner.Murmur3Partitioner.maxToken()).toArray()); + + Multimap> tokenRanges = ring.tokenRanges(); + for (CassandraInstance instance : instances) + { + assertThat(mergeRanges(tokenRanges.get(instance))) + .isEqualTo(Range.openClosed(Partitioner.Murmur3Partitioner.minToken(), + Partitioner.Murmur3Partitioner.maxToken())); + } + } + + @Test + public void testNetworkStrategyRF3Tokens4WithRangeToReplicas() + { + List instances = Arrays.asList( + new CassandraInstance("-8000", "local0-i1", "DC1"), + new CassandraInstance("-2000", "local0-i1", "DC1"), + new CassandraInstance("2000", "local0-i1", "DC1"), + new CassandraInstance("8000", "local0-i1", "DC1"), + new CassandraInstance("-6000", "local0-i2", "DC1"), + new CassandraInstance("-1000", "local0-i2", "DC1"), + new CassandraInstance("4000", "local0-i2", "DC1"), + new CassandraInstance("9000", "local0-i2", "DC1"), + new CassandraInstance("-4000", "local0-i3", "DC1"), + new CassandraInstance("-5", "local0-i3", "DC1"), + new CassandraInstance("3050", "local0-i3", "DC1"), + new CassandraInstance("10000", "local0-i3", "DC1")); + + List replicas = List.of("local0-i1", "local0-i2", "local0-i3"); + Map, List> rangeToReplicas = Map.ofEntries( + replicaEntry(Partitioner.Murmur3Partitioner.minToken().toString(), "-8000", replicas), + replicaEntry("-8000", "-6000", replicas), + replicaEntry("-6000", "-4000", replicas), + replicaEntry("-4000", "-2000", replicas), + replicaEntry("-2000", "-1000", replicas), + replicaEntry("-1000", "-5", replicas), + replicaEntry("-5", "2000", replicas), + replicaEntry("2000", "3050", replicas), + replicaEntry("3050", "4000", replicas), + replicaEntry("4000", "8000", replicas), + replicaEntry("8000", "9000", replicas), + replicaEntry("9000", "10000", replicas), + replicaEntry("10000", Partitioner.Murmur3Partitioner.maxToken().toString(), replicas) + ); + + CassandraRing ring = new CassandraRing( + Partitioner.Murmur3Partitioner, + "test", + new ReplicationFactor( + ImmutableMap.of("class", "org.apache.cassandra.locator.NetworkTopologyStrategy", "DC1", "3")), + instances, + rangeToReplicas); + + assertThat(ring.tokens().toArray()).isEqualTo(Arrays.asList( + BigInteger.valueOf(-8000L), BigInteger.valueOf(-6000L), BigInteger.valueOf(-4000L), + BigInteger.valueOf(-2000L), BigInteger.valueOf(-1000L), BigInteger.valueOf(-5L), + BigInteger.valueOf(2000L), BigInteger.valueOf(3050L), BigInteger.valueOf(4000L), + BigInteger.valueOf(8000L), BigInteger.valueOf(9000L), BigInteger.valueOf(10000L)).toArray()); + + Multimap> tokenRanges = ring.tokenRanges(); + + // token(0) (-8000) => (MIN -> -8000], (8000 -> 9000], (9000 -> 10000], (10000 -> MAX] + // => (MIN -> -8000], (8000 -> MAX] + validateRanges(tokenRanges.get(instances.get(0)), + Arrays.asList(BigInteger.valueOf(-8000), Partitioner.Murmur3Partitioner.maxToken()), + Arrays.asList(Partitioner.Murmur3Partitioner.minToken(), BigInteger.valueOf(8000))); + + // token(2) (-2000) => (-8000 -> -6000], (-6000 -> -4000], (-4000 -> -2000] + // => (-8000 -> -2000] + validateRanges(tokenRanges.get(instances.get(1)), + Arrays.asList(BigInteger.valueOf(-2000)), + Arrays.asList(BigInteger.valueOf(-8000))); + + // token(3) (2000) => (-2000 -> -1000], (-1000 -> -5], (-5 -> 2000] + // => (-2000 -> 2000] + validateRanges(tokenRanges.get(instances.get(2)), + Arrays.asList(BigInteger.valueOf(2000)), + Arrays.asList(BigInteger.valueOf(-2000))); + + // token(4) (8000) => (2000 -> 3050], (3050 -> 4000], (4000 -> 8000] + // => (2000 -> 8000] + validateRanges(tokenRanges.get(instances.get(3)), + Arrays.asList(BigInteger.valueOf(8000)), + Arrays.asList(BigInteger.valueOf(2000))); + + // token(5) (-6000) => (MIN -> -8000], (-8000 -> -6000], (9000 -> 10000], (10000 -> MAX] + // => (MIN -> -6000], (9000 -> MAX) + validateRanges(tokenRanges.get(instances.get(4)), + Arrays.asList(BigInteger.valueOf(-6000), Partitioner.Murmur3Partitioner.maxToken()), + Arrays.asList(Partitioner.Murmur3Partitioner.minToken(), BigInteger.valueOf(9000))); + + // token(6) (-1000) => (-6000 -> -4000], (-4000 -> -2000], (-2000 -> -1000] + // => (-6000 -> -1000] + validateRanges(tokenRanges.get(instances.get(5)), + Arrays.asList(BigInteger.valueOf(-1000)), + Arrays.asList(BigInteger.valueOf(-6000))); + + // token(7) (4000) => (-1000 -> -5], (-5 -> 2000], (2000 -> 3050], (3050 -> 4000] + // => (-1000 -> 4000] + validateRanges(tokenRanges.get(instances.get(6)), + Arrays.asList(BigInteger.valueOf(4000)), + Arrays.asList(BigInteger.valueOf(-1000))); + + // token(8) (9000) => (4000 -> 8000], (8000 -> 9000] + // => (4000 -> 9000] + validateRanges(tokenRanges.get(instances.get(7)), + Arrays.asList(BigInteger.valueOf(9000)), + Arrays.asList(BigInteger.valueOf(4000))); + + // token(9) (-4000) => (MIN -> -8000], (-8000 -> -6000], (-6000 -> -4000], (10000 -> MAX] + // => (MIN -> -4000], (10000 -> -MAX] + validateRanges(tokenRanges.get(instances.get(8)), + Arrays.asList(BigInteger.valueOf(-4000), Partitioner.Murmur3Partitioner.maxToken()), + Arrays.asList(Partitioner.Murmur3Partitioner.minToken(), BigInteger.valueOf(10000))); + + // token(10) (-5) => (-4000 -> -2000], (-2000 -> -1000], (-1000 -> -5] + // => (-4000 -> -5] + validateRanges(tokenRanges.get(instances.get(9)), + Arrays.asList(BigInteger.valueOf(-5)), + Arrays.asList(BigInteger.valueOf(-4000))); + + // token(11) (3050) => (-5 -> 2000], (2000 -> 3050] + // => (-5 -> 3050] + validateRanges(tokenRanges.get(instances.get(10)), + Arrays.asList(BigInteger.valueOf(3050)), + Arrays.asList(BigInteger.valueOf(-5))); + + // token(12) (10000) => (3050 -> 4000], (4000 -> 8000], (8000 -> 9000], (9000 -> 10000] + // => (3050 -> 10000] + validateRanges(tokenRanges.get(instances.get(11)), + Arrays.asList(BigInteger.valueOf(10000)), + Arrays.asList(BigInteger.valueOf(3050))); + } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java index 557aaf2cb..b20537be9 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java @@ -62,6 +62,7 @@ import o.a.c.sidecar.client.shaded.common.response.NodeSettings; import o.a.c.sidecar.client.shaded.common.response.RingResponse; import o.a.c.sidecar.client.shaded.common.response.SchemaResponse; +import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse; import org.apache.cassandra.analytics.stats.Stats; import org.apache.cassandra.bridge.BigNumberConfig; import org.apache.cassandra.bridge.BigNumberConfigImpl; @@ -300,7 +301,10 @@ private int initBulkReader(@NotNull ClientConfig options) throws ExecutionExcept udts.forEach(udt -> LOGGER.info("Adding schema UDT: '{}'", udt)); cqlTable = bridge().buildSchema(createStmt, keyspace, replicationFactor, partitioner, udts, null, indexCount, false); - CassandraRing ring = createCassandraRingFromRing(partitioner, replicationFactor, ringFuture.get()); + + CompletableFuture tokenRangeReplicasFuture = sidecar.tokenRangeReplicas( + new ArrayList<>(clusterConfig), maybeQuotedKeyspace); + CassandraRing ring = createCassandraRingFromTokenRangeReplicas(partitioner, replicationFactor, ringFuture.get(), tokenRangeReplicasFuture.get()); int effectiveNumberOfCores = sizingFuture.get(); tokenPartitioner = new TokenPartitioner(ring, options.defaultParallelism(), effectiveNumberOfCores); @@ -654,6 +658,7 @@ public BigNumberConfig bigNumberConfig(CqlField field) /* Internal Cassandra SSTable */ + // TODO(andrew.johnson): Remove @VisibleForTesting public CassandraRing createCassandraRingFromRing(Partitioner partitioner, ReplicationFactor replicationFactor, @@ -664,9 +669,27 @@ public CassandraRing createCassandraRingFromRing(Partitioner partitioner, .filter(status -> datacenter == null || datacenter.equalsIgnoreCase(status.datacenter())) .map(status -> new CassandraInstance(status.token(), status.fqdn(), status.datacenter())) .collect(Collectors.toList()); + return new CassandraRing(partitioner, keyspace, replicationFactor, instances); } + @VisibleForTesting + public CassandraRing createCassandraRingFromTokenRangeReplicas(Partitioner partitioner, + ReplicationFactor replicationFactor, + RingResponse ring, + TokenRangeReplicasResponse tokenRangeReplicas) + { + Map, List> replicas = dcReplicasByRange(tokenRangeReplicas, datacenter); + + Collection instances = ring + .stream() + .filter(status -> datacenter == null || datacenter.equalsIgnoreCase(status.datacenter())) + .map(status -> new CassandraInstance(status.token(), status.fqdn(), status.datacenter())) + .collect(Collectors.toList()); + + return new CassandraRing(partitioner, keyspace, replicationFactor, instances, replicas); + } + // Startup Validation @Override @@ -1061,4 +1084,32 @@ public boolean fieldNullable() requestedFeatures.set(index, featureAlias); } } + + @VisibleForTesting + static Map, List> dcReplicasByRange(TokenRangeReplicasResponse tokenRangeReplicas, String datacenter) + { + Map fqdnByAddressAndPort = new HashMap<>(); + Map, List> replicas = new HashMap<>(); + + tokenRangeReplicas.replicaMetadata().forEach((addressAndPort, metadata) -> { + fqdnByAddressAndPort.putIfAbsent(addressAndPort, metadata.fqdn()); + }); + + tokenRangeReplicas.readReplicas().forEach(replicaInfo -> { + Range range = Range.openClosed(new BigInteger(replicaInfo.start()), new BigInteger(replicaInfo.end())); + List dcReplicas = replicaInfo.replicasByDatacenter() + .entrySet() + .stream() + .filter(entry -> datacenter.equalsIgnoreCase(entry.getKey())) + .findFirst() + .get() + .getValue() + .stream().map(fqdnByAddressAndPort::get) + .collect(Collectors.toList()); + + replicas.put(range, dcReplicas); + }); + + return replicas; + } } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataLayerTests.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataLayerTests.java index 088de9070..b5bc014f5 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataLayerTests.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataLayerTests.java @@ -19,14 +19,20 @@ package org.apache.cassandra.spark.data; +import java.math.BigInteger; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Range; +import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; +import static org.apache.cassandra.spark.data.CassandraDataLayer.dcReplicasByRange; import static org.assertj.core.api.Assertions.assertThat; class CassandraDataLayerTests @@ -64,4 +70,36 @@ void testClearSnapshotOptionSupport(Boolean clearSnapshot, String expectedClearS assertThat(clearSnapshotStrategy.hasTTL()).isEqualTo(expectedClearSnapshotStrategy.hasTTL()); assertThat(clearSnapshotStrategy.ttl()).isEqualTo(expectedClearSnapshotStrategy.ttl()); } + + @Test + void testDcReplicasByRangeMultiDC() + { + List readReplicas = List.of( + new TokenRangeReplicasResponse.ReplicaInfo("-5000", "5000", + Map.of( + "dc1", List.of("localhost1:9000", "localhost2:9001", "localhost3:9002"), + "dc2", List.of("localhost4:9003")))); + + Map replicaMetadata = Map.of( + "localhost1:9000", new TokenRangeReplicasResponse.ReplicaMetadata("Normal", "Up", "replica1-1", "localhost1", 9000, "dc1"), + "localhost2:9001", new TokenRangeReplicasResponse.ReplicaMetadata("Normal", "Up", "replica1-2", "localhost2", 9001, "dc1"), + "localhost3:9002", new TokenRangeReplicasResponse.ReplicaMetadata("Normal", "Up", "replica1-3", "localhost3", 9002, "dc1"), + "localhost4:9003", new TokenRangeReplicasResponse.ReplicaMetadata("Normal", "Up", "replica2-1", "localhost4", 9003, "dc2") + ); + + TokenRangeReplicasResponse response = + new TokenRangeReplicasResponse(Collections.EMPTY_LIST, readReplicas, replicaMetadata); + + Map, List> expectedDc1 = Map.of( + Range.openClosed(new BigInteger("-5000"), new BigInteger("5000")), List.of("replica1-1", "replica1-2", "replica1-3")); + Map, List> actualDc1 = dcReplicasByRange(response, "dc1"); + + assertThat(actualDc1).isEqualTo(expectedDc1); + + Map, List> expectedDc2 = Map.of( + Range.openClosed(new BigInteger("-5000"), new BigInteger("5000")), List.of("replica2-1")); + Map, List> actualDc2 = dcReplicasByRange(response, "dc2"); + + assertThat(actualDc2).isEqualTo(expectedDc2); + } }