diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 106f5e01e25..a6d0346b996 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -1990,6 +1990,10 @@ transparent_data_encryption_options: # SAFETY THRESHOLDS # ##################### +# Log the query if it reads more than +# sstables_per_read_log_threshold SSTables +sstables_per_read_log_threshold: 50 + # When executing a scan, within or across a partition, we need to keep the # tombstones seen in memory so we can return them to the coordinator, which # will use them to make sure other replicas also know about the deleted rows. diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 986346da415..771bd38e6c6 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -579,6 +579,7 @@ public static class SSTableConfig public volatile DataStorageSpec.LongBytesBound row_index_read_size_warn_threshold = null; public volatile DataStorageSpec.LongBytesBound row_index_read_size_fail_threshold = null; + public volatile int sstables_per_read_log_threshold = 50; public volatile int tombstone_warn_threshold = 1000; public volatile int tombstone_failure_threshold = 100000; diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 23e0aa83418..0c4eb444961 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -3215,6 +3215,16 @@ public static int getMaxMutationSize() return conf.max_mutation_size.toBytes(); } + public static int getSSTablesPerReadLogThreshold() + { + return conf.sstables_per_read_log_threshold; + } + + public static void setSSTablesPerReadLogThreshold(int threshold) + { + conf.sstables_per_read_log_threshold = threshold; + } + public static int getTombstoneWarnThreshold() { return conf.tombstone_warn_threshold; diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java index f448bcf8724..9167c8daae1 100644 --- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java @@ -58,12 +58,14 @@ import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.transport.Dispatcher; +import org.apache.cassandra.utils.NoSpamLogger; /** * A read command that selects a (part of a) range of partitions. */ public class PartitionRangeReadCommand extends ReadCommand implements PartitionRangeReadQuery { + private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 1L, TimeUnit.SECONDS); protected static final SelectionDeserializer selectionDeserializer = new Deserializer(); protected final Slices requestedSlices; @@ -434,6 +436,11 @@ public UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, Rea final int finalSelectedSSTables = selectedSSTablesCnt; + if (finalSelectedSSTables > DatabaseDescriptor.getSSTablesPerReadLogThreshold()) + { + noSpamLogger.info("The following query '{}' has read {} SSTables.", this.toCQLString(), finalSelectedSSTables); + } + // iterators can be empty for offline tools if (inputCollector.isEmpty()) return EmptyIterators.unfilteredPartition(metadata()); diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 780a8c083f5..4dbaa3855e4 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -87,12 +87,14 @@ import org.apache.cassandra.transport.Dispatcher; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.btree.BTreeSet; +import org.apache.cassandra.utils.NoSpamLogger; /** * A read command that selects a (part of a) single partition. */ public class SinglePartitionReadCommand extends ReadCommand implements SinglePartitionReadQuery { + private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 1L, TimeUnit.SECONDS); protected static final SelectionDeserializer selectionDeserializer = new Deserializer(); protected static final Function accordSelectionDeserializer = AccordDeserializer::new; @@ -876,7 +878,15 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs StorageHook.instance.reportRead(cfs.metadata().id, partitionKey()); List iterators = inputCollector.finalizeIterators(cfs, nowInSec(), controller.oldestUnrepairedTombstone()); - return withSSTablesIterated(iterators, cfs.metric, metricsCollector); + + UnfilteredRowIterator result = withSSTablesIterated(iterators, cfs.metric, metricsCollector); + + if (metricsCollector.getMergedSSTables() > DatabaseDescriptor.getSSTablesPerReadLogThreshold()) + { + noSpamLogger.info("The following query '{}' has read {} SSTables.", this.toCQLString(), metricsCollector.getMergedSSTables()); + } + + return result; } catch (RuntimeException | Error e) { @@ -1084,6 +1094,11 @@ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFam cfs.metric.updateSSTableIterated(metricsCollector.getMergedSSTables()); + if (metricsCollector.getMergedSSTables() > DatabaseDescriptor.getSSTablesPerReadLogThreshold()) + { + noSpamLogger.info("The following query '{}' has read {} SSTables.", this.toCQLString(), metricsCollector.getMergedSSTables()); + } + if (result == null || result.isEmpty()) return EmptyIterators.unfilteredRow(metadata(), partitionKey(), false); diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index a5703fe6018..3d5336d9829 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -4709,6 +4709,17 @@ public void setInvalidateKeycacheOnSSTableDeletion(boolean invalidate) DatabaseDescriptor.setInvalidateKeycacheOnSSTableDeletion(invalidate); } + public int getSSTablesPerReadLogThreshold() + { + return DatabaseDescriptor.getSSTablesPerReadLogThreshold(); + } + + public void setSSTablesPerReadLogThreshold(int threshold) + { + DatabaseDescriptor.setSSTablesPerReadLogThreshold(threshold); + logger.info("updated sstables_per_read_log_threshold to {}", threshold); + } + public int getTombstoneWarnThreshold() { return DatabaseDescriptor.getTombstoneWarnThreshold(); diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index e5cbd980c4c..bb6c2010d65 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -1027,6 +1027,11 @@ default int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, /** Returns the cluster partitioner */ public String getPartitionerName(); + /** Returns the threshold for logging queries that read more than threshold amount of SSTables */ + public int getSSTablesPerReadLogThreshold(); + /** Sets the threshold for logging queries that read more than threshold amount of SSTables */ + public void setSSTablesPerReadLogThreshold(int threshold); + /** Returns the threshold for warning of queries with many tombstones */ public int getTombstoneWarnThreshold(); /** Sets the threshold for warning queries with many tombstones */ diff --git a/test/distributed/org/apache/cassandra/distributed/test/SSTableReadLogsQueryTest.java b/test/distributed/org/apache/cassandra/distributed/test/SSTableReadLogsQueryTest.java new file mode 100644 index 00000000000..da2a6efcbb2 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/SSTableReadLogsQueryTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import org.junit.Test; +import static org.junit.Assert.assertEquals; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; + +public class SSTableReadLogsQueryTest extends TestBaseImpl +{ + @Test + public void logQueryTest() throws Throwable + { + try (Cluster cluster = init(Cluster.build(1) + .withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK)) + .start())) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int PRIMARY KEY, v counter)"); + + cluster.get(1).runOnInstance(() -> { + Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").disableAutoCompaction(); + }); + + for (int i = 0; i <= 50; i++) + { + cluster.get(1).executeInternal("UPDATE " + KEYSPACE + ".tbl SET v = v + 1 WHERE pk = 2"); + cluster.get(1).flush(withKeyspace("%s")); + } + + cluster.get(1).runOnInstance(() -> { + assertEquals(51, Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getTracker().getView().liveSSTables().size()); + }); + + String query = "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 2"; + cluster.get(1).executeInternalWithResult(query); + + assertEquals(1, cluster.get(1).logs().watchFor("The following query").getResult().size()); + } + } + + @Test + public void setSSTablesPerReadLogThresholdTest() throws Throwable + { + try (Cluster cluster = init(Cluster.build(1) + .withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK)) + .start())) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int PRIMARY KEY, v counter)"); + + cluster.get(1).runOnInstance(() -> { + Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").disableAutoCompaction(); + }); + + cluster.get(1).runOnInstance(() -> { + DatabaseDescriptor.setSSTablesPerReadLogThreshold(25); + }); + + for (int i = 0; i <= 25; i++) + { + cluster.get(1).executeInternal("UPDATE " + KEYSPACE + ".tbl SET v = v + 1 WHERE pk = 2"); + cluster.get(1).flush(withKeyspace("%s")); + } + + cluster.get(1).runOnInstance(() -> { + assertEquals(26, Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getTracker().getView().liveSSTables().size()); + }); + + String query = "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 2"; + cluster.get(1).executeInternalWithResult(query); + + assertEquals(1, cluster.get(1).logs().watchFor("The following query").getResult().size()); + } + } + + @Test + public void logRangeReadQueryTest() throws Throwable + { + try (Cluster cluster = init(Cluster.build(1) + .withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK)) + .start())) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int PRIMARY KEY, v int)"); + + cluster.get(1).runOnInstance(() -> { + Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").disableAutoCompaction(); + }); + + for (int i = 0; i <= 50; i++) + { + cluster.get(1).executeInternal(String.format("INSERT INTO " + KEYSPACE + ".tbl (pk, v) VALUES (%s, %s)", i, i)); + cluster.get(1).flush(withKeyspace("%s")); + } + + cluster.get(1).runOnInstance(() -> { + assertEquals(51, Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getTracker().getView().liveSSTables().size()); + }); + + String query = "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk >= 0 AND pk < 51 ALLOW FILTERING"; + cluster.get(1).executeInternalWithResult(query); + + assertEquals(1, cluster.get(1).logs().watchFor("The following query").getResult().size()); + } + } +} \ No newline at end of file