Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static com.google.common.base.Preconditions.checkState;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -29,7 +30,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.accumulo.core.dataImpl.KeyExtent;
Expand Down Expand Up @@ -105,6 +106,7 @@ public boolean equals(Object o) {
private final int maxSize;
private final AtomicLong rejectedJobs;
private final AtomicLong dequeuedJobs;
private final ArrayDeque<CompletableFuture<CompactionJobQueues.MetaJob>> futures;

private static class TabletJobs {
final long generation;
Expand All @@ -122,22 +124,17 @@ private TabletJobs(long generation, HashSet<CjpqKey> jobs) {

private final AtomicLong nextSeq = new AtomicLong(0);

private final AtomicBoolean closed = new AtomicBoolean(false);

public CompactionJobPriorityQueue(CompactorGroupId groupId, int maxSize) {
this.jobQueue = new TreeMap<>();
this.maxSize = maxSize;
this.tabletJobs = new HashMap<>();
this.groupId = groupId;
this.rejectedJobs = new AtomicLong(0);
this.dequeuedJobs = new AtomicLong(0);
this.futures = new ArrayDeque<>();
}

public synchronized void removeOlderGenerations(Ample.DataLevel level, long currGeneration) {
if (closed.get()) {
return;
}

List<KeyExtent> removals = new ArrayList<>();

tabletJobs.forEach((extent, jobs) -> {
Expand All @@ -160,16 +157,26 @@ public synchronized void removeOlderGenerations(Ample.DataLevel level, long curr
public synchronized int add(TabletMetadata tabletMetadata, Collection<CompactionJob> jobs,
long generation) {
Preconditions.checkArgument(jobs.stream().allMatch(job -> job.getGroup().equals(groupId)));
if (closed.get()) {
return -1;
}

removePreviousSubmissions(tabletMetadata.getExtent());

HashSet<CjpqKey> newEntries = new HashSet<>(jobs.size());

int jobsAdded = 0;
for (CompactionJob job : jobs) {
outer: for (CompactionJob job : jobs) {
var future = futures.poll();
while (future != null) {
// its expected that if futures are present then the queue is empty, if this is not true
// then there is a bug
Preconditions.checkState(jobQueue.isEmpty());
if (future.complete(new CompactionJobQueues.MetaJob(job, tabletMetadata))) {
// successfully completed a future with this job, so do not need to queue the job
jobsAdded++;
continue outer;
} // else the future was canceled or timed out so could not complete it
future = futures.poll();
}

CjpqKey cjqpKey = addJobToQueue(tabletMetadata, job);
if (cjqpKey != null) {
checkState(newEntries.add(cjqpKey));
Expand Down Expand Up @@ -227,25 +234,25 @@ public synchronized CompactionJobQueues.MetaJob poll() {
return first == null ? null : first.getValue();
}

public synchronized CompletableFuture<CompactionJobQueues.MetaJob> getAsync() {
var job = jobQueue.pollFirstEntry();
if (job != null) {
return CompletableFuture.completedFuture(job.getValue());
}

// There is currently nothing in the queue, so create an uncompleted future and queue it up to
// be completed when something does arrive.
CompletableFuture<CompactionJobQueues.MetaJob> future = new CompletableFuture<>();
futures.add(future);
return future;
}

// exists for tests
synchronized CompactionJobQueues.MetaJob peek() {
var firstEntry = jobQueue.firstEntry();
return firstEntry == null ? null : firstEntry.getValue();
}

public boolean isClosed() {
return closed.get();
}

public synchronized boolean closeIfEmpty() {
if (jobQueue.isEmpty()) {
closed.set(true);
return true;
}

return false;
}

private void removePreviousSubmissions(KeyExtent extent) {
TabletJobs prevJobs = tabletJobs.get(extent);
if (prevJobs != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Collections;
import java.util.EnumMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentHashMap.KeySetView;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -34,8 +35,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;

public class CompactionJobQueues {

private static final Logger log = LoggerFactory.getLogger(CompactionJobQueues.class);
Expand Down Expand Up @@ -157,23 +156,25 @@ public TabletMetadata getTabletMetadata() {
}
}

/**
* Asynchronously get a compaction job from the queue. If the queue currently has jobs then a
* completed future will be returned containing the highest priority job in the queue. If the
* queue is currently empty, then an uncompleted future will be returned and later when something
* is added to the queue the future will be completed.
*/
public CompletableFuture<MetaJob> getAsync(CompactorGroupId groupId) {
var pq = priorityQueues.computeIfAbsent(groupId,
gid -> new CompactionJobPriorityQueue(gid, queueSize));
return pq.getAsync();
}

public MetaJob poll(CompactorGroupId groupId) {
var prioQ = priorityQueues.get(groupId);
if (prioQ == null) {
return null;
}
MetaJob mj = prioQ.poll();

if (mj == null) {
priorityQueues.computeIfPresent(groupId, (eid, pq) -> {
if (pq.closeIfEmpty()) {
return null;
} else {
return pq;
}
});
}
return mj;

return prioQ.poll();
}

private void add(TabletMetadata tabletMetadata, CompactorGroupId groupId,
Expand All @@ -187,14 +188,7 @@ private void add(TabletMetadata tabletMetadata, CompactorGroupId groupId,

var pq = priorityQueues.computeIfAbsent(groupId,
gid -> new CompactionJobPriorityQueue(gid, queueSize));
while (pq.add(tabletMetadata, jobs,
currentGenerations.get(DataLevel.of(tabletMetadata.getTableId())).get()) < 0) {
// When entering this loop its expected the queue is closed
Preconditions.checkState(pq.isClosed());
// This loop handles race condition where poll() closes empty priority queues. The queue could
// be closed after its obtained from the map and before add is called.
pq = priorityQueues.computeIfAbsent(groupId,
gid -> new CompactionJobPriorityQueue(gid, queueSize));
}
pq.add(tabletMetadata, jobs,
currentGenerations.get(DataLevel.of(tabletMetadata.getTableId())).get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
package org.apache.accumulo.manager.compaction.queue;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -209,58 +207,6 @@ public void testAddMoreThanMax() {

}

@Test
public void testAddAfterClose() {

CompactableFile file1 = EasyMock.createMock(CompactableFileImpl.class);
CompactableFile file2 = EasyMock.createMock(CompactableFileImpl.class);
CompactableFile file3 = EasyMock.createMock(CompactableFileImpl.class);
CompactableFile file4 = EasyMock.createMock(CompactableFileImpl.class);

KeyExtent extent = new KeyExtent(TableId.of("1"), new Text("z"), new Text("a"));
TabletMetadata tm = EasyMock.createMock(TabletMetadata.class);
EasyMock.expect(tm.getExtent()).andReturn(extent).anyTimes();

CompactionJob cj1 = EasyMock.createMock(CompactionJob.class);
EasyMock.expect(cj1.getGroup()).andReturn(GROUP).anyTimes();
EasyMock.expect(cj1.getPriority()).andReturn((short) 10).anyTimes();
EasyMock.expect(cj1.getFiles()).andReturn(Set.of(file1)).anyTimes();

CompactionJob cj2 = EasyMock.createMock(CompactionJob.class);
EasyMock.expect(cj2.getGroup()).andReturn(GROUP).anyTimes();
EasyMock.expect(cj2.getPriority()).andReturn((short) 5).anyTimes();
EasyMock.expect(cj2.getFiles()).andReturn(Set.of(file2, file3, file4)).anyTimes();

EasyMock.replay(tm, cj1, cj2);

CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 2);
assertEquals(2, queue.add(tm, List.of(cj1, cj2), 1L));

assertFalse(queue.closeIfEmpty());

EasyMock.verify(tm, cj1, cj2);

assertEquals(5L, queue.getLowestPriority());
assertEquals(2, queue.getMaxSize());
assertEquals(0, queue.getDequeuedJobs());
assertEquals(0, queue.getRejectedJobs());
assertEquals(2, queue.getQueuedJobs());
MetaJob job = queue.poll();
assertEquals(cj1, job.getJob());
assertEquals(tm, job.getTabletMetadata());
assertEquals(1, queue.getDequeuedJobs());

MetaJob job2 = queue.poll();
assertEquals(cj2, job2.getJob());
assertEquals(tm, job2.getTabletMetadata());
assertEquals(2, queue.getDequeuedJobs());

assertTrue(queue.closeIfEmpty());

assertEquals(-1, queue.add(tm, List.of(cj1, cj2), 1L));

}

private static int counter = 1;

private Pair<TabletMetadata,CompactionJob> createJob() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.accumulo.manager.compaction.queue;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -332,4 +334,57 @@ public void testAddPollRaceCondition() throws Exception {
// The background threads should have seen every job that was added
assertEquals(numToAdd, totalSeen);
}

@Test
public void testGetAsync() throws Exception {
CompactionJobQueues jobQueues = new CompactionJobQueues(100);

var tid = TableId.of("1");
var extent1 = new KeyExtent(tid, new Text("z"), new Text("q"));
var extent2 = new KeyExtent(tid, new Text("q"), new Text("l"));
var extent3 = new KeyExtent(tid, new Text("l"), new Text("c"));
var extent4 = new KeyExtent(tid, new Text("c"), new Text("a"));

var tm1 = TabletMetadata.builder(extent1).build();
var tm2 = TabletMetadata.builder(extent2).build();
var tm3 = TabletMetadata.builder(extent3).build();
var tm4 = TabletMetadata.builder(extent4).build();

var cg1 = CompactorGroupId.of("CG1");

var future1 = jobQueues.getAsync(cg1);
var future2 = jobQueues.getAsync(cg1);

assertFalse(future1.isDone());
assertFalse(future2.isDone());

jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1)));
jobQueues.add(tm2, List.of(newJob((short) 2, 6, cg1)));
jobQueues.add(tm3, List.of(newJob((short) 3, 7, cg1)));
jobQueues.add(tm4, List.of(newJob((short) 4, 8, cg1)));

var future3 = jobQueues.getAsync(cg1);
var future4 = jobQueues.getAsync(cg1);

assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());
assertTrue(future4.isDone());

assertEquals(extent1, future1.get().getTabletMetadata().getExtent());
assertEquals(extent2, future2.get().getTabletMetadata().getExtent());
assertEquals(extent4, future3.get().getTabletMetadata().getExtent());
assertEquals(extent3, future4.get().getTabletMetadata().getExtent());

// test cancelling a future
var future5 = jobQueues.getAsync(cg1);
assertFalse(future5.isDone());
future5.cancel(false);
var future6 = jobQueues.getAsync(cg1);
assertFalse(future6.isDone());
// since future5 was canceled, this addition should go to future6
jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1)));
assertTrue(future6.isDone());
assertEquals(extent1, future6.get().getTabletMetadata().getExtent());
}
}