Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,8 @@ private List<Collection<CompactableFile>> findFilesToCompactWithLowerRatio(

private static short createPriority(PlanningParameters params,
Collection<CompactableFile> group) {
return CompactionJobPrioritizer.createPriority(params.getKind(), params.getAll().size(),
group.size());
return CompactionJobPrioritizer.createPriority(params.getTableId(), params.getKind(),
params.getAll().size(), group.size());
}

private long getMaxSizeToCompact(CompactionKind kind) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,40 +20,77 @@

import java.util.Comparator;

import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.spi.compaction.CompactionJob;
import org.apache.accumulo.core.spi.compaction.CompactionKind;

import com.google.common.base.Preconditions;

public class CompactionJobPrioritizer {

public static final Comparator<CompactionJob> JOB_COMPARATOR =
Comparator.comparingInt(CompactionJob::getPriority)
.thenComparingInt(job -> job.getFiles().size()).reversed();

public static short createPriority(CompactionKind kind, int totalFiles, int compactingFiles) {
private static final short ROOT_USER_MAX = Short.MAX_VALUE;
private static final short ROOT_USER_MIN = ROOT_USER_MAX - 1000;
private static final short ROOT_SYSTEM_MAX = ROOT_USER_MIN - 1;
private static final short ROOT_SYSTEM_MIN = ROOT_SYSTEM_MAX - 1000;
private static final short METADATA_USER_MAX = ROOT_SYSTEM_MIN - 1;
private static final short METADATA_USER_MIN = METADATA_USER_MAX - 1000;
private static final short METADATA_SYSTEM_MAX = METADATA_USER_MIN - 1;
private static final short METADATA_SYSTEM_MIN = METADATA_SYSTEM_MAX - 1000;
private static final short USER_USER_MAX = METADATA_SYSTEM_MIN - 1;
private static final short USER_USER_MIN = USER_USER_MAX - 30768;
private static final short USER_SYSTEM_MAX = USER_USER_MIN - 1;
private static final short USER_SYSTEM_MIN = Short.MIN_VALUE;

int prio = totalFiles + compactingFiles;
public static short createPriority(TableId tableId, CompactionKind kind, int totalFiles,
int compactingFiles) {

switch (kind) {
case USER:
// user-initiated compactions will have a positive priority
// based on number of files
if (prio > Short.MAX_VALUE) {
return Short.MAX_VALUE;
Preconditions.checkArgument(totalFiles >= 0, "totalFiles is negative %s", totalFiles);
Preconditions.checkArgument(compactingFiles >= 0, "compactingFiles is negative %s",
compactingFiles);

int min;
int max;
// This holds the two bits used to encode the priority of the table.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the work in #4133 change anything with this compaction priority calculation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it could. We could add the table id from that work into this calculation once its merged

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two bits for the table and only three of those fours slots currently being used. Can use the 4th slot for #4133. If this is merged before #4133, may need to open an issue.

int tablePrefix;

switch (Ample.DataLevel.of(tableId)) {
case ROOT:
if (kind == CompactionKind.USER) {
min = ROOT_USER_MIN;
max = ROOT_USER_MAX;
} else {
min = ROOT_SYSTEM_MIN;
max = ROOT_SYSTEM_MAX;
}
return (short) prio;
case SELECTOR:
case SYSTEM:
// system-initiated compactions will have a negative priority
// starting at -32768 and increasing based on number of files
// maxing out at -1
if (prio > Short.MAX_VALUE) {
return -1;
break;
case METADATA:
if (kind == CompactionKind.USER) {
min = METADATA_USER_MIN;
max = METADATA_USER_MAX;
} else {
return (short) (Short.MIN_VALUE + prio);
min = METADATA_SYSTEM_MIN;
max = METADATA_SYSTEM_MAX;
}
break;
case USER:
if (kind == CompactionKind.USER) {
min = USER_USER_MIN;
max = USER_USER_MAX;
} else {
min = USER_SYSTEM_MIN;
max = USER_SYSTEM_MAX;
}
break;
default:
throw new AssertionError("Unknown kind " + kind);
throw new IllegalStateException("Unknown data level" + Ample.DataLevel.of(tableId));
}

return (short) Math.min(max, min + totalFiles + compactingFiles);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.accumulo.core.spi.compaction.CompactionPlanner.InitParameters;
import org.apache.accumulo.core.util.ConfigurationImpl;
import org.apache.accumulo.core.util.compaction.CompactionJobImpl;
import org.apache.accumulo.core.util.compaction.CompactionJobPrioritizer;
import org.apache.accumulo.core.util.compaction.CompactionPlanImpl;
import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams;
import org.apache.accumulo.core.util.compaction.CompactorGroupIdImpl;
Expand Down Expand Up @@ -201,6 +202,8 @@ public void testUserCompaction() {
var job = getOnlyElement(plan.getJobs());
assertEquals(candidates, job.getFiles());
assertEquals(CompactorGroupIdImpl.groupId("medium"), job.getGroup());
assertEquals(CompactionJobPrioritizer.createPriority(TableId.of("42"), CompactionKind.USER,
all.size(), job.getFiles().size()), job.getPriority());

// should only run one user compaction at a time
compacting = Set.of(createJob(CompactionKind.USER, all, createCFs("F1", "3M", "F2", "3M")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
*/
package org.apache.accumulo.core.util.compaction;

import static org.apache.accumulo.core.util.compaction.CompactionJobPrioritizer.createPriority;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.util.ArrayList;
Expand All @@ -28,6 +31,9 @@
import java.util.Optional;

import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.spi.compaction.CompactionJob;
import org.apache.accumulo.core.spi.compaction.CompactionKind;
import org.junit.jupiter.api.Test;
Expand All @@ -42,29 +48,103 @@ public CompactionJob createJob(CompactionKind kind, String tablet, int numFiles,
.create(URI.create("hdfs://foonn/accumulo/tables/5/" + tablet + "/" + i + ".rf"), 4, 4));
}
// TODO pass numFiles
return new CompactionJobImpl(
CompactionJobPrioritizer.createPriority(kind, totalFiles, numFiles),
return new CompactionJobImpl(createPriority(TableId.of("1"), kind, totalFiles, numFiles),
CompactorGroupIdImpl.groupId("test"), files, kind, Optional.of(false));
}

@Test
public void testPrioritizer() throws Exception {
assertEquals((short) 0, CompactionJobPrioritizer.createPriority(CompactionKind.USER, 0, 0));
assertEquals((short) 10000,
CompactionJobPrioritizer.createPriority(CompactionKind.USER, 10000, 0));
assertEquals((short) 32767,
CompactionJobPrioritizer.createPriority(CompactionKind.USER, 32767, 0));
assertEquals((short) 32767,
CompactionJobPrioritizer.createPriority(CompactionKind.USER, Integer.MAX_VALUE, 0));

assertEquals((short) -32768,
CompactionJobPrioritizer.createPriority(CompactionKind.SYSTEM, 0, 0));
assertEquals((short) -22768,
CompactionJobPrioritizer.createPriority(CompactionKind.SYSTEM, 10000, 0));
assertEquals((short) -1,
CompactionJobPrioritizer.createPriority(CompactionKind.SYSTEM, 32767, 0));
assertEquals((short) -1,
CompactionJobPrioritizer.createPriority(CompactionKind.SYSTEM, Integer.MAX_VALUE, 0));
public void testOrdering() {
short pr1 = createPriority(RootTable.ID, CompactionKind.USER, 10000, 1);
assertEquals(Short.MAX_VALUE, pr1);
short pr2 = createPriority(RootTable.ID, CompactionKind.USER, 100, 30);
assertTrue(pr1 > pr2);
short pr3 = createPriority(RootTable.ID, CompactionKind.USER, 100, 1);
assertTrue(pr2 > pr3);
short pr4 = createPriority(RootTable.ID, CompactionKind.USER, 1, 1);
assertTrue(pr3 > pr4);
short pr5 = createPriority(RootTable.ID, CompactionKind.SYSTEM, 10000, 1);
assertTrue(pr4 > pr5);
short pr6 = createPriority(RootTable.ID, CompactionKind.SYSTEM, 100, 30);
assertTrue(pr5 > pr6);
short pr7 = createPriority(RootTable.ID, CompactionKind.SYSTEM, 100, 1);
assertTrue(pr6 > pr7);
short pr8 = createPriority(RootTable.ID, CompactionKind.SYSTEM, 1, 1);
assertTrue(pr7 > pr8);

short pm1 = createPriority(MetadataTable.ID, CompactionKind.USER, 10000, 1);
assertTrue(pr8 > pm1);
short pm2 = createPriority(MetadataTable.ID, CompactionKind.USER, 100, 30);
assertTrue(pm1 > pm2);
short pm3 = createPriority(MetadataTable.ID, CompactionKind.USER, 100, 1);
assertTrue(pm2 > pm3);
short pm4 = createPriority(MetadataTable.ID, CompactionKind.USER, 1, 1);
assertTrue(pm3 > pm4);
short pm5 = createPriority(MetadataTable.ID, CompactionKind.SYSTEM, 10000, 1);
assertTrue(pm4 > pm5);
short pm6 = createPriority(MetadataTable.ID, CompactionKind.SYSTEM, 100, 30);
assertTrue(pm5 > pm6);
short pm7 = createPriority(MetadataTable.ID, CompactionKind.SYSTEM, 100, 1);
assertTrue(pm6 > pm7);
short pm8 = createPriority(MetadataTable.ID, CompactionKind.SYSTEM, 1, 1);
assertTrue(pm7 > pm8);

var userTable1 = TableId.of("1");
var userTable2 = TableId.of("2");

short pu1 = createPriority(userTable1, CompactionKind.USER, 10000, 1);
assertTrue(pm8 > pu1);
short pu2 = createPriority(userTable2, CompactionKind.USER, 1000, 30);
assertTrue(pu1 > pu2);
short pu3 = createPriority(userTable1, CompactionKind.USER, 1000, 1);
assertTrue(pu2 > pu3);
short pu4 = createPriority(userTable2, CompactionKind.USER, 1, 1);
assertTrue(pu3 > pu4);
short pu5 = createPriority(userTable1, CompactionKind.SYSTEM, 10000, 1);
assertTrue(pu4 > pu5);
short pu6 = createPriority(userTable2, CompactionKind.SYSTEM, 1000, 30);
assertTrue(pu5 > pu6);
short pu7 = createPriority(userTable1, CompactionKind.SYSTEM, 1000, 1);
assertTrue(pu6 > pu7);
short pu8 = createPriority(userTable2, CompactionKind.SYSTEM, 1, 1);
assertTrue(pu7 > pu8);
assertEquals(Short.MIN_VALUE + 2, pu8);
}

@Test
public void testBoundary() {
var userTable = TableId.of("1");

short minRootUser = createPriority(RootTable.ID, CompactionKind.USER, 1, 1);
short minRootSystem = createPriority(RootTable.ID, CompactionKind.SYSTEM, 1, 1);
short minMetaUser = createPriority(MetadataTable.ID, CompactionKind.USER, 1, 1);
short minMetaSystem = createPriority(MetadataTable.ID, CompactionKind.SYSTEM, 1, 1);
short minUserUser = createPriority(userTable, CompactionKind.USER, 1, 1);

// Test the boundary condition around the max number of files to encode. Ensure the next level
// is always greater no matter how many files.
for (int files = 1; files < 100_000; files += 1) {
short rootSystem = createPriority(RootTable.ID, CompactionKind.SYSTEM, files, 1);
assertTrue(minRootUser > rootSystem);
short metaUser = createPriority(MetadataTable.ID, CompactionKind.USER, files, 1);
assertTrue(minRootSystem > metaUser);
short metaSystem = createPriority(MetadataTable.ID, CompactionKind.SYSTEM, files, 1);
assertTrue(minMetaUser > metaSystem);
short userUser = createPriority(userTable, CompactionKind.USER, files, 1);
assertTrue(minMetaSystem > userUser);
short userSystem = createPriority(userTable, CompactionKind.SYSTEM, files, 1);
assertTrue(minUserUser > userSystem);
}

}

@Test
public void testNegative() {
for (var tableId : List.of(TableId.of("1"), TableId.of("2"), RootTable.ID, MetadataTable.ID)) {
for (var kind : CompactionKind.values()) {
assertThrows(IllegalArgumentException.class, () -> createPriority(tableId, kind, -5, 2));
assertThrows(IllegalArgumentException.class, () -> createPriority(tableId, kind, 10, -5));
}
}
}

@Test
Expand Down