From f977aaf41bb88471ab5c4dfe5fd1a4d81bd71a38 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Wed, 3 Jul 2024 22:03:54 -0400 Subject: [PATCH 1/3] Adds completable futures to compaction queue Adds completeable futures to the queue of compaction jobs. This allows for async notification when something is added to the queue. The compaction queues code would drop queues that became empty. The concept of queues being empty became more complex with this change. A queue would be considered empty when there were no futures and the queue was empty. This increased complexity of empty would have made the code for dropping empty queues more complex. Instead of increasing the complexity of this code chose to drop removing empty queues. This means that if a compaction group is used and then no longer used that it will have a small empty datastructure sitting around in map for the process lifetime. That is unlikely to cause memory issues. Therefore decided the increased complexity was not worthwhile given it was unlikely to cause memory problems. --- .../queue/CompactionJobPriorityQueue.java | 55 +++++++++++-------- .../compaction/queue/CompactionJobQueues.java | 40 ++++++-------- .../queue/CompactionJobPriorityQueueTest.java | 54 ------------------ .../queue/CompactionJobQueuesTest.java | 55 +++++++++++++++++++ 4 files changed, 103 insertions(+), 101 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java index 4dfd6868ad4..9909ccb7f9a 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkState; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -29,7 +30,7 @@ import java.util.Objects; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -105,6 +106,7 @@ public boolean equals(Object o) { private final int maxSize; private final AtomicLong rejectedJobs; private final AtomicLong dequeuedJobs; + private final ArrayDeque> futures; private static class TabletJobs { final long generation; @@ -122,8 +124,6 @@ private TabletJobs(long generation, HashSet jobs) { private final AtomicLong nextSeq = new AtomicLong(0); - private final AtomicBoolean closed = new AtomicBoolean(false); - public CompactionJobPriorityQueue(CompactorGroupId groupId, int maxSize) { this.jobQueue = new TreeMap<>(); this.maxSize = maxSize; @@ -131,13 +131,10 @@ public CompactionJobPriorityQueue(CompactorGroupId groupId, int maxSize) { this.groupId = groupId; this.rejectedJobs = new AtomicLong(0); this.dequeuedJobs = new AtomicLong(0); + this.futures = new ArrayDeque<>(); } public synchronized void removeOlderGenerations(Ample.DataLevel level, long currGeneration) { - if (closed.get()) { - return; - } - List removals = new ArrayList<>(); tabletJobs.forEach((extent, jobs) -> { @@ -160,16 +157,26 @@ public synchronized void removeOlderGenerations(Ample.DataLevel level, long curr public synchronized int add(TabletMetadata tabletMetadata, Collection jobs, long generation) { Preconditions.checkArgument(jobs.stream().allMatch(job -> job.getGroup().equals(groupId))); - if (closed.get()) { - return -1; - } removePreviousSubmissions(tabletMetadata.getExtent()); HashSet newEntries = new HashSet<>(jobs.size()); int jobsAdded = 0; - for (CompactionJob job : jobs) { + outer: for (CompactionJob job : jobs) { + var future = futures.poll(); + while (future != null) { + // its expected that if futures are present then the queue is empty, if this is not true + // then there is a bug + Preconditions.checkState(jobQueue.isEmpty()); + if (future.complete(new CompactionJobQueues.MetaJob(job, tabletMetadata))) { + // successfully completed a future with this job, so do not need to queue the job + jobsAdded++; + continue outer; + } // else the future was canceled or timed out so could not complete it + future = futures.poll(); + } + CjpqKey cjqpKey = addJobToQueue(tabletMetadata, job); if (cjqpKey != null) { checkState(newEntries.add(cjqpKey)); @@ -227,25 +234,25 @@ public synchronized CompactionJobQueues.MetaJob poll() { return first == null ? null : first.getValue(); } + public synchronized CompletableFuture getAsync() { + var job = jobQueue.pollFirstEntry(); + if (job != null) { + return CompletableFuture.completedFuture(job.getValue()); + } + + // There is currently nothing in the queue, so create an uncompleted future and queue it up to + // be completed when something does arrive. + CompletableFuture future = new CompletableFuture<>(); + futures.add(future); + return future; + } + // exists for tests synchronized CompactionJobQueues.MetaJob peek() { var firstEntry = jobQueue.firstEntry(); return firstEntry == null ? null : firstEntry.getValue(); } - public boolean isClosed() { - return closed.get(); - } - - public synchronized boolean closeIfEmpty() { - if (jobQueue.isEmpty()) { - closed.set(true); - return true; - } - - return false; - } - private void removePreviousSubmissions(KeyExtent extent) { TabletJobs prevJobs = tabletJobs.get(extent); if (prevJobs != null) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java index 8c462273573..b9fe1ed424c 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.EnumMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap.KeySetView; import java.util.concurrent.atomic.AtomicLong; @@ -34,8 +35,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; - public class CompactionJobQueues { private static final Logger log = LoggerFactory.getLogger(CompactionJobQueues.class); @@ -157,23 +156,25 @@ public TabletMetadata getTabletMetadata() { } } + /** + * Asynchronously get a compaction job from the queue. If the queue currently has jobs then a + * completed future will be returned containing the highest priority job in the queue. If the + * queue is currently empty, then an uncompleted future will be returned and later when something + * is added to the queue the future will be completed. + */ + public CompletableFuture getAsync(CompactorGroupId groupId) { + var pq = priorityQueues.computeIfAbsent(groupId, + gid -> new CompactionJobPriorityQueue(gid, queueSize)); + return pq.getAsync(); + } + public MetaJob poll(CompactorGroupId groupId) { var prioQ = priorityQueues.get(groupId); if (prioQ == null) { return null; } - MetaJob mj = prioQ.poll(); - - if (mj == null) { - priorityQueues.computeIfPresent(groupId, (eid, pq) -> { - if (pq.closeIfEmpty()) { - return null; - } else { - return pq; - } - }); - } - return mj; + + return prioQ.poll(); } private void add(TabletMetadata tabletMetadata, CompactorGroupId groupId, @@ -187,14 +188,7 @@ private void add(TabletMetadata tabletMetadata, CompactorGroupId groupId, var pq = priorityQueues.computeIfAbsent(groupId, gid -> new CompactionJobPriorityQueue(gid, queueSize)); - while (pq.add(tabletMetadata, jobs, - currentGenerations.get(DataLevel.of(tabletMetadata.getTableId())).get()) < 0) { - // When entering this loop its expected the queue is closed - Preconditions.checkState(pq.isClosed()); - // This loop handles race condition where poll() closes empty priority queues. The queue could - // be closed after its obtained from the map and before add is called. - pq = priorityQueues.computeIfAbsent(groupId, - gid -> new CompactionJobPriorityQueue(gid, queueSize)); - } + pq.add(tabletMetadata, jobs, + currentGenerations.get(DataLevel.of(tabletMetadata.getTableId())).get()); } } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java index 2e090c32a29..ddf9a3016e2 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java @@ -19,9 +19,7 @@ package org.apache.accumulo.manager.compaction.queue; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.HashSet; import java.util.List; @@ -209,58 +207,6 @@ public void testAddMoreThanMax() { } - @Test - public void testAddAfterClose() { - - CompactableFile file1 = EasyMock.createMock(CompactableFileImpl.class); - CompactableFile file2 = EasyMock.createMock(CompactableFileImpl.class); - CompactableFile file3 = EasyMock.createMock(CompactableFileImpl.class); - CompactableFile file4 = EasyMock.createMock(CompactableFileImpl.class); - - KeyExtent extent = new KeyExtent(TableId.of("1"), new Text("z"), new Text("a")); - TabletMetadata tm = EasyMock.createMock(TabletMetadata.class); - EasyMock.expect(tm.getExtent()).andReturn(extent).anyTimes(); - - CompactionJob cj1 = EasyMock.createMock(CompactionJob.class); - EasyMock.expect(cj1.getGroup()).andReturn(GROUP).anyTimes(); - EasyMock.expect(cj1.getPriority()).andReturn((short) 10).anyTimes(); - EasyMock.expect(cj1.getFiles()).andReturn(Set.of(file1)).anyTimes(); - - CompactionJob cj2 = EasyMock.createMock(CompactionJob.class); - EasyMock.expect(cj2.getGroup()).andReturn(GROUP).anyTimes(); - EasyMock.expect(cj2.getPriority()).andReturn((short) 5).anyTimes(); - EasyMock.expect(cj2.getFiles()).andReturn(Set.of(file2, file3, file4)).anyTimes(); - - EasyMock.replay(tm, cj1, cj2); - - CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 2); - assertEquals(2, queue.add(tm, List.of(cj1, cj2), 1L)); - - assertFalse(queue.closeIfEmpty()); - - EasyMock.verify(tm, cj1, cj2); - - assertEquals(5L, queue.getLowestPriority()); - assertEquals(2, queue.getMaxSize()); - assertEquals(0, queue.getDequeuedJobs()); - assertEquals(0, queue.getRejectedJobs()); - assertEquals(2, queue.getQueuedJobs()); - MetaJob job = queue.poll(); - assertEquals(cj1, job.getJob()); - assertEquals(tm, job.getTabletMetadata()); - assertEquals(1, queue.getDequeuedJobs()); - - MetaJob job2 = queue.poll(); - assertEquals(cj2, job2.getJob()); - assertEquals(tm, job2.getTabletMetadata()); - assertEquals(2, queue.getDequeuedJobs()); - - assertTrue(queue.closeIfEmpty()); - - assertEquals(-1, queue.add(tm, List.of(cj1, cj2), 1L)); - - } - private static int counter = 1; private Pair createJob() { diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java index 73aa4042956..72d084af0f9 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java @@ -19,7 +19,9 @@ package org.apache.accumulo.manager.compaction.queue; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.net.URI; import java.net.URISyntaxException; @@ -332,4 +334,57 @@ public void testAddPollRaceCondition() throws Exception { // The background threads should have seen every job that was added assertEquals(numToAdd, totalSeen); } + + @Test + public void testGetAsync() throws URISyntaxException { + CompactionJobQueues jobQueues = new CompactionJobQueues(100); + + var tid = TableId.of("1"); + var extent1 = new KeyExtent(tid, new Text("z"), new Text("q")); + var extent2 = new KeyExtent(tid, new Text("q"), new Text("l")); + var extent3 = new KeyExtent(tid, new Text("l"), new Text("c")); + var extent4 = new KeyExtent(tid, new Text("c"), new Text("a")); + + var tm1 = TabletMetadata.builder(extent1).build(); + var tm2 = TabletMetadata.builder(extent2).build(); + var tm3 = TabletMetadata.builder(extent3).build(); + var tm4 = TabletMetadata.builder(extent4).build(); + + var cg1 = CompactorGroupId.of("CG1"); + + var future1 = jobQueues.getAsync(cg1); + var future2 = jobQueues.getAsync(cg1); + + assertFalse(future1.isDone()); + assertFalse(future2.isDone()); + + jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1))); + jobQueues.add(tm2, List.of(newJob((short) 2, 6, cg1))); + jobQueues.add(tm3, List.of(newJob((short) 3, 7, cg1))); + jobQueues.add(tm4, List.of(newJob((short) 4, 8, cg1))); + + var future3 = jobQueues.getAsync(cg1); + var future4 = jobQueues.getAsync(cg1); + + assertTrue(future1.isDone()); + assertTrue(future2.isDone()); + assertTrue(future3.isDone()); + assertTrue(future4.isDone()); + + assertEquals(extent1, future1.getNow(null).getTabletMetadata().getExtent()); + assertEquals(extent2, future2.getNow(null).getTabletMetadata().getExtent()); + assertEquals(extent4, future3.getNow(null).getTabletMetadata().getExtent()); + assertEquals(extent3, future4.getNow(null).getTabletMetadata().getExtent()); + + // test cancelling a future + var future5 = jobQueues.getAsync(cg1); + assertFalse(future5.isDone()); + future5.cancel(false); + var future6 = jobQueues.getAsync(cg1); + assertFalse(future6.isDone()); + // since future5 was canceled, this addition should go to future6 + jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1))); + assertTrue(future6.isDone()); + assertEquals(extent1, future6.getNow(null).getTabletMetadata().getExtent()); + } } From 22bc1d5fb6e9b1dc8ab83cad127bdf1132f38d63 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Wed, 3 Jul 2024 22:20:38 -0400 Subject: [PATCH 2/3] fix build error --- .../compaction/queue/CompactionJobQueuesTest.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java index 72d084af0f9..02b482fa6f8 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java @@ -336,7 +336,7 @@ public void testAddPollRaceCondition() throws Exception { } @Test - public void testGetAsync() throws URISyntaxException { + public void testGetAsync() throws Exception { CompactionJobQueues jobQueues = new CompactionJobQueues(100); var tid = TableId.of("1"); @@ -371,10 +371,10 @@ public void testGetAsync() throws URISyntaxException { assertTrue(future3.isDone()); assertTrue(future4.isDone()); - assertEquals(extent1, future1.getNow(null).getTabletMetadata().getExtent()); - assertEquals(extent2, future2.getNow(null).getTabletMetadata().getExtent()); - assertEquals(extent4, future3.getNow(null).getTabletMetadata().getExtent()); - assertEquals(extent3, future4.getNow(null).getTabletMetadata().getExtent()); + assertEquals(extent1, future1.get().getTabletMetadata().getExtent()); + assertEquals(extent2, future2.get().getTabletMetadata().getExtent()); + assertEquals(extent4, future3.get().getTabletMetadata().getExtent()); + assertEquals(extent3, future4.get().getTabletMetadata().getExtent()); // test cancelling a future var future5 = jobQueues.getAsync(cg1); From 75dd4f04037a5a5bb15f5a79527c1f8e61b48e23 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Wed, 3 Jul 2024 22:34:14 -0400 Subject: [PATCH 3/3] fix build --- .../manager/compaction/queue/CompactionJobQueuesTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java index 02b482fa6f8..a9f360b4cde 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java @@ -385,6 +385,6 @@ public void testGetAsync() throws Exception { // since future5 was canceled, this addition should go to future6 jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1))); assertTrue(future6.isDone()); - assertEquals(extent1, future6.getNow(null).getTabletMetadata().getExtent()); + assertEquals(extent1, future6.get().getTabletMetadata().getExtent()); } }