Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions conf/cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1990,6 +1990,10 @@ transparent_data_encryption_options:
# SAFETY THRESHOLDS #
#####################

# Log the query if it reads more than
# sstables_per_read_log_threshold SSTables
sstables_per_read_log_threshold: 50

# When executing a scan, within or across a partition, we need to keep the
# tombstones seen in memory so we can return them to the coordinator, which
# will use them to make sure other replicas also know about the deleted rows.
Expand Down
1 change: 1 addition & 0 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,7 @@ public static class SSTableConfig
public volatile DataStorageSpec.LongBytesBound row_index_read_size_warn_threshold = null;
public volatile DataStorageSpec.LongBytesBound row_index_read_size_fail_threshold = null;

public volatile int sstables_per_read_log_threshold = 50;
public volatile int tombstone_warn_threshold = 1000;
public volatile int tombstone_failure_threshold = 100000;

Expand Down
10 changes: 10 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -3215,6 +3215,16 @@ public static int getMaxMutationSize()
return conf.max_mutation_size.toBytes();
}

public static int getSSTablesPerReadLogThreshold()
{
return conf.sstables_per_read_log_threshold;
}

public static void setSSTablesPerReadLogThreshold(int threshold)
{
conf.sstables_per_read_log_threshold = threshold;
}

public static int getTombstoneWarnThreshold()
{
return conf.tombstone_warn_threshold;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,14 @@
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.transport.Dispatcher;
import org.apache.cassandra.utils.NoSpamLogger;

/**
* A read command that selects a (part of a) range of partitions.
*/
public class PartitionRangeReadCommand extends ReadCommand implements PartitionRangeReadQuery
{
private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 1L, TimeUnit.SECONDS);
protected static final SelectionDeserializer selectionDeserializer = new Deserializer();

protected final Slices requestedSlices;
Expand Down Expand Up @@ -434,6 +436,11 @@ public UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, Rea

final int finalSelectedSSTables = selectedSSTablesCnt;

if (finalSelectedSSTables > DatabaseDescriptor.getSSTablesPerReadLogThreshold())
{
noSpamLogger.info("The following query '{}' has read {} SSTables.", this.toCQLString(), finalSelectedSSTables);
}

// iterators can be empty for offline tools
if (inputCollector.isEmpty())
return EmptyIterators.unfilteredPartition(metadata());
Expand Down
17 changes: 16 additions & 1 deletion src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,14 @@
import org.apache.cassandra.transport.Dispatcher;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.btree.BTreeSet;
import org.apache.cassandra.utils.NoSpamLogger;

/**
* A read command that selects a (part of a) single partition.
*/
public class SinglePartitionReadCommand extends ReadCommand implements SinglePartitionReadQuery
{
private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 1L, TimeUnit.SECONDS);
protected static final SelectionDeserializer selectionDeserializer = new Deserializer();
protected static final Function<Seekable, SelectionDeserializer> accordSelectionDeserializer = AccordDeserializer::new;

Expand Down Expand Up @@ -876,7 +878,15 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs
StorageHook.instance.reportRead(cfs.metadata().id, partitionKey());

List<UnfilteredRowIterator> iterators = inputCollector.finalizeIterators(cfs, nowInSec(), controller.oldestUnrepairedTombstone());
return withSSTablesIterated(iterators, cfs.metric, metricsCollector);

UnfilteredRowIterator result = withSSTablesIterated(iterators, cfs.metric, metricsCollector);

if (metricsCollector.getMergedSSTables() > DatabaseDescriptor.getSSTablesPerReadLogThreshold())
{
noSpamLogger.info("The following query '{}' has read {} SSTables.", this.toCQLString(), metricsCollector.getMergedSSTables());
}

return result;
}
catch (RuntimeException | Error e)
{
Expand Down Expand Up @@ -1084,6 +1094,11 @@ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFam

cfs.metric.updateSSTableIterated(metricsCollector.getMergedSSTables());

if (metricsCollector.getMergedSSTables() > DatabaseDescriptor.getSSTablesPerReadLogThreshold())
{
noSpamLogger.info("The following query '{}' has read {} SSTables.", this.toCQLString(), metricsCollector.getMergedSSTables());
}

if (result == null || result.isEmpty())
return EmptyIterators.unfilteredRow(metadata(), partitionKey(), false);

Expand Down
11 changes: 11 additions & 0 deletions src/java/org/apache/cassandra/service/StorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -4709,6 +4709,17 @@ public void setInvalidateKeycacheOnSSTableDeletion(boolean invalidate)
DatabaseDescriptor.setInvalidateKeycacheOnSSTableDeletion(invalidate);
}

public int getSSTablesPerReadLogThreshold()
{
return DatabaseDescriptor.getSSTablesPerReadLogThreshold();
}

public void setSSTablesPerReadLogThreshold(int threshold)
{
DatabaseDescriptor.setSSTablesPerReadLogThreshold(threshold);
logger.info("updated sstables_per_read_log_threshold to {}", threshold);
}

public int getTombstoneWarnThreshold()
{
return DatabaseDescriptor.getTombstoneWarnThreshold();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,11 @@ default int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion,
/** Returns the cluster partitioner */
public String getPartitionerName();

/** Returns the threshold for logging queries that read more than threshold amount of SSTables */
public int getSSTablesPerReadLogThreshold();
/** Sets the threshold for logging queries that read more than threshold amount of SSTables */
public void setSSTablesPerReadLogThreshold(int threshold);

/** Returns the threshold for warning of queries with many tombstones */
public int getTombstoneWarnThreshold();
/** Sets the threshold for warning queries with many tombstones */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.distributed.test;

import org.junit.Test;
import static org.junit.Assert.assertEquals;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.Feature;

public class SSTableReadLogsQueryTest extends TestBaseImpl
{
@Test
public void logQueryTest() throws Throwable
{
try (Cluster cluster = init(Cluster.build(1)
.withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK))
.start()))
{
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int PRIMARY KEY, v counter)");

cluster.get(1).runOnInstance(() -> {
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").disableAutoCompaction();
});

for (int i = 0; i <= 50; i++)
{
cluster.get(1).executeInternal("UPDATE " + KEYSPACE + ".tbl SET v = v + 1 WHERE pk = 2");
cluster.get(1).flush(withKeyspace("%s"));
}

cluster.get(1).runOnInstance(() -> {
assertEquals(51, Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getTracker().getView().liveSSTables().size());
});

String query = "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 2";
cluster.get(1).executeInternalWithResult(query);

assertEquals(1, cluster.get(1).logs().watchFor("The following query").getResult().size());
}
}

@Test
public void setSSTablesPerReadLogThresholdTest() throws Throwable
{
try (Cluster cluster = init(Cluster.build(1)
.withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK))
.start()))
{
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int PRIMARY KEY, v counter)");

cluster.get(1).runOnInstance(() -> {
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").disableAutoCompaction();
});

cluster.get(1).runOnInstance(() -> {
DatabaseDescriptor.setSSTablesPerReadLogThreshold(25);
});

for (int i = 0; i <= 25; i++)
{
cluster.get(1).executeInternal("UPDATE " + KEYSPACE + ".tbl SET v = v + 1 WHERE pk = 2");
cluster.get(1).flush(withKeyspace("%s"));
}

cluster.get(1).runOnInstance(() -> {
assertEquals(26, Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getTracker().getView().liveSSTables().size());
});

String query = "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 2";
cluster.get(1).executeInternalWithResult(query);

assertEquals(1, cluster.get(1).logs().watchFor("The following query").getResult().size());
}
}

@Test
public void logRangeReadQueryTest() throws Throwable
{
try (Cluster cluster = init(Cluster.build(1)
.withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK))
.start()))
{
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int PRIMARY KEY, v int)");

cluster.get(1).runOnInstance(() -> {
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").disableAutoCompaction();
});

for (int i = 0; i <= 50; i++)
{
cluster.get(1).executeInternal(String.format("INSERT INTO " + KEYSPACE + ".tbl (pk, v) VALUES (%s, %s)", i, i));
cluster.get(1).flush(withKeyspace("%s"));
}

cluster.get(1).runOnInstance(() -> {
assertEquals(51, Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getTracker().getView().liveSSTables().size());
});

String query = "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk >= 0 AND pk < 51 ALLOW FILTERING";
cluster.get(1).executeInternalWithResult(query);

assertEquals(1, cluster.get(1).logs().watchFor("The following query").getResult().size());
}
}
}