diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java index 4dfd6868ad4..9909ccb7f9a 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkState; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -29,7 +30,7 @@ import java.util.Objects; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -105,6 +106,7 @@ public boolean equals(Object o) { private final int maxSize; private final AtomicLong rejectedJobs; private final AtomicLong dequeuedJobs; + private final ArrayDeque> futures; private static class TabletJobs { final long generation; @@ -122,8 +124,6 @@ private TabletJobs(long generation, HashSet jobs) { private final AtomicLong nextSeq = new AtomicLong(0); - private final AtomicBoolean closed = new AtomicBoolean(false); - public CompactionJobPriorityQueue(CompactorGroupId groupId, int maxSize) { this.jobQueue = new TreeMap<>(); this.maxSize = maxSize; @@ -131,13 +131,10 @@ public CompactionJobPriorityQueue(CompactorGroupId groupId, int maxSize) { this.groupId = groupId; this.rejectedJobs = new AtomicLong(0); this.dequeuedJobs = new AtomicLong(0); + this.futures = new ArrayDeque<>(); } public synchronized void removeOlderGenerations(Ample.DataLevel level, long currGeneration) { - if (closed.get()) { - return; - } - List removals = new ArrayList<>(); tabletJobs.forEach((extent, jobs) -> { @@ -160,16 +157,26 @@ public synchronized void removeOlderGenerations(Ample.DataLevel level, long curr public synchronized int add(TabletMetadata tabletMetadata, Collection jobs, long generation) { Preconditions.checkArgument(jobs.stream().allMatch(job -> job.getGroup().equals(groupId))); - if (closed.get()) { - return -1; - } removePreviousSubmissions(tabletMetadata.getExtent()); HashSet newEntries = new HashSet<>(jobs.size()); int jobsAdded = 0; - for (CompactionJob job : jobs) { + outer: for (CompactionJob job : jobs) { + var future = futures.poll(); + while (future != null) { + // its expected that if futures are present then the queue is empty, if this is not true + // then there is a bug + Preconditions.checkState(jobQueue.isEmpty()); + if (future.complete(new CompactionJobQueues.MetaJob(job, tabletMetadata))) { + // successfully completed a future with this job, so do not need to queue the job + jobsAdded++; + continue outer; + } // else the future was canceled or timed out so could not complete it + future = futures.poll(); + } + CjpqKey cjqpKey = addJobToQueue(tabletMetadata, job); if (cjqpKey != null) { checkState(newEntries.add(cjqpKey)); @@ -227,25 +234,25 @@ public synchronized CompactionJobQueues.MetaJob poll() { return first == null ? null : first.getValue(); } + public synchronized CompletableFuture getAsync() { + var job = jobQueue.pollFirstEntry(); + if (job != null) { + return CompletableFuture.completedFuture(job.getValue()); + } + + // There is currently nothing in the queue, so create an uncompleted future and queue it up to + // be completed when something does arrive. + CompletableFuture future = new CompletableFuture<>(); + futures.add(future); + return future; + } + // exists for tests synchronized CompactionJobQueues.MetaJob peek() { var firstEntry = jobQueue.firstEntry(); return firstEntry == null ? null : firstEntry.getValue(); } - public boolean isClosed() { - return closed.get(); - } - - public synchronized boolean closeIfEmpty() { - if (jobQueue.isEmpty()) { - closed.set(true); - return true; - } - - return false; - } - private void removePreviousSubmissions(KeyExtent extent) { TabletJobs prevJobs = tabletJobs.get(extent); if (prevJobs != null) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java index 8c462273573..b9fe1ed424c 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.EnumMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap.KeySetView; import java.util.concurrent.atomic.AtomicLong; @@ -34,8 +35,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; - public class CompactionJobQueues { private static final Logger log = LoggerFactory.getLogger(CompactionJobQueues.class); @@ -157,23 +156,25 @@ public TabletMetadata getTabletMetadata() { } } + /** + * Asynchronously get a compaction job from the queue. If the queue currently has jobs then a + * completed future will be returned containing the highest priority job in the queue. If the + * queue is currently empty, then an uncompleted future will be returned and later when something + * is added to the queue the future will be completed. + */ + public CompletableFuture getAsync(CompactorGroupId groupId) { + var pq = priorityQueues.computeIfAbsent(groupId, + gid -> new CompactionJobPriorityQueue(gid, queueSize)); + return pq.getAsync(); + } + public MetaJob poll(CompactorGroupId groupId) { var prioQ = priorityQueues.get(groupId); if (prioQ == null) { return null; } - MetaJob mj = prioQ.poll(); - - if (mj == null) { - priorityQueues.computeIfPresent(groupId, (eid, pq) -> { - if (pq.closeIfEmpty()) { - return null; - } else { - return pq; - } - }); - } - return mj; + + return prioQ.poll(); } private void add(TabletMetadata tabletMetadata, CompactorGroupId groupId, @@ -187,14 +188,7 @@ private void add(TabletMetadata tabletMetadata, CompactorGroupId groupId, var pq = priorityQueues.computeIfAbsent(groupId, gid -> new CompactionJobPriorityQueue(gid, queueSize)); - while (pq.add(tabletMetadata, jobs, - currentGenerations.get(DataLevel.of(tabletMetadata.getTableId())).get()) < 0) { - // When entering this loop its expected the queue is closed - Preconditions.checkState(pq.isClosed()); - // This loop handles race condition where poll() closes empty priority queues. The queue could - // be closed after its obtained from the map and before add is called. - pq = priorityQueues.computeIfAbsent(groupId, - gid -> new CompactionJobPriorityQueue(gid, queueSize)); - } + pq.add(tabletMetadata, jobs, + currentGenerations.get(DataLevel.of(tabletMetadata.getTableId())).get()); } } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java index 2e090c32a29..ddf9a3016e2 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java @@ -19,9 +19,7 @@ package org.apache.accumulo.manager.compaction.queue; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.HashSet; import java.util.List; @@ -209,58 +207,6 @@ public void testAddMoreThanMax() { } - @Test - public void testAddAfterClose() { - - CompactableFile file1 = EasyMock.createMock(CompactableFileImpl.class); - CompactableFile file2 = EasyMock.createMock(CompactableFileImpl.class); - CompactableFile file3 = EasyMock.createMock(CompactableFileImpl.class); - CompactableFile file4 = EasyMock.createMock(CompactableFileImpl.class); - - KeyExtent extent = new KeyExtent(TableId.of("1"), new Text("z"), new Text("a")); - TabletMetadata tm = EasyMock.createMock(TabletMetadata.class); - EasyMock.expect(tm.getExtent()).andReturn(extent).anyTimes(); - - CompactionJob cj1 = EasyMock.createMock(CompactionJob.class); - EasyMock.expect(cj1.getGroup()).andReturn(GROUP).anyTimes(); - EasyMock.expect(cj1.getPriority()).andReturn((short) 10).anyTimes(); - EasyMock.expect(cj1.getFiles()).andReturn(Set.of(file1)).anyTimes(); - - CompactionJob cj2 = EasyMock.createMock(CompactionJob.class); - EasyMock.expect(cj2.getGroup()).andReturn(GROUP).anyTimes(); - EasyMock.expect(cj2.getPriority()).andReturn((short) 5).anyTimes(); - EasyMock.expect(cj2.getFiles()).andReturn(Set.of(file2, file3, file4)).anyTimes(); - - EasyMock.replay(tm, cj1, cj2); - - CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 2); - assertEquals(2, queue.add(tm, List.of(cj1, cj2), 1L)); - - assertFalse(queue.closeIfEmpty()); - - EasyMock.verify(tm, cj1, cj2); - - assertEquals(5L, queue.getLowestPriority()); - assertEquals(2, queue.getMaxSize()); - assertEquals(0, queue.getDequeuedJobs()); - assertEquals(0, queue.getRejectedJobs()); - assertEquals(2, queue.getQueuedJobs()); - MetaJob job = queue.poll(); - assertEquals(cj1, job.getJob()); - assertEquals(tm, job.getTabletMetadata()); - assertEquals(1, queue.getDequeuedJobs()); - - MetaJob job2 = queue.poll(); - assertEquals(cj2, job2.getJob()); - assertEquals(tm, job2.getTabletMetadata()); - assertEquals(2, queue.getDequeuedJobs()); - - assertTrue(queue.closeIfEmpty()); - - assertEquals(-1, queue.add(tm, List.of(cj1, cj2), 1L)); - - } - private static int counter = 1; private Pair createJob() { diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java index 73aa4042956..a9f360b4cde 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java @@ -19,7 +19,9 @@ package org.apache.accumulo.manager.compaction.queue; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.net.URI; import java.net.URISyntaxException; @@ -332,4 +334,57 @@ public void testAddPollRaceCondition() throws Exception { // The background threads should have seen every job that was added assertEquals(numToAdd, totalSeen); } + + @Test + public void testGetAsync() throws Exception { + CompactionJobQueues jobQueues = new CompactionJobQueues(100); + + var tid = TableId.of("1"); + var extent1 = new KeyExtent(tid, new Text("z"), new Text("q")); + var extent2 = new KeyExtent(tid, new Text("q"), new Text("l")); + var extent3 = new KeyExtent(tid, new Text("l"), new Text("c")); + var extent4 = new KeyExtent(tid, new Text("c"), new Text("a")); + + var tm1 = TabletMetadata.builder(extent1).build(); + var tm2 = TabletMetadata.builder(extent2).build(); + var tm3 = TabletMetadata.builder(extent3).build(); + var tm4 = TabletMetadata.builder(extent4).build(); + + var cg1 = CompactorGroupId.of("CG1"); + + var future1 = jobQueues.getAsync(cg1); + var future2 = jobQueues.getAsync(cg1); + + assertFalse(future1.isDone()); + assertFalse(future2.isDone()); + + jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1))); + jobQueues.add(tm2, List.of(newJob((short) 2, 6, cg1))); + jobQueues.add(tm3, List.of(newJob((short) 3, 7, cg1))); + jobQueues.add(tm4, List.of(newJob((short) 4, 8, cg1))); + + var future3 = jobQueues.getAsync(cg1); + var future4 = jobQueues.getAsync(cg1); + + assertTrue(future1.isDone()); + assertTrue(future2.isDone()); + assertTrue(future3.isDone()); + assertTrue(future4.isDone()); + + assertEquals(extent1, future1.get().getTabletMetadata().getExtent()); + assertEquals(extent2, future2.get().getTabletMetadata().getExtent()); + assertEquals(extent4, future3.get().getTabletMetadata().getExtent()); + assertEquals(extent3, future4.get().getTabletMetadata().getExtent()); + + // test cancelling a future + var future5 = jobQueues.getAsync(cg1); + assertFalse(future5.isDone()); + future5.cancel(false); + var future6 = jobQueues.getAsync(cg1); + assertFalse(future6.isDone()); + // since future5 was canceled, this addition should go to future6 + jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1))); + assertTrue(future6.isDone()); + assertEquals(extent1, future6.get().getTabletMetadata().getExtent()); + } }