From 5b733349f33eedd632cc82cf8e19f2e3148b3188 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Wed, 27 Sep 2023 19:19:29 -0400 Subject: [PATCH] Implements missing compacton selection functionality Compaction selection functionality that opened files was not implemented because it opens user files in the manager. This change implements the functionality inorder to get integration test passing. There is already an open issue about finding a better way to do this. See #3526 --- .../compaction/CompactionPluginUtils.java | 64 ++++++++++++++++++- 1 file changed, 61 insertions(+), 3 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionPluginUtils.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionPluginUtils.java index 6f7529948fe..7978d3c2e5f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionPluginUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionPluginUtils.java @@ -18,6 +18,8 @@ */ package org.apache.accumulo.server.compaction; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -34,6 +36,7 @@ import org.apache.accumulo.core.client.admin.compaction.CompactableFile; import org.apache.accumulo.core.client.admin.compaction.CompactionConfigurer; import org.apache.accumulo.core.client.admin.compaction.CompactionSelector; +import org.apache.accumulo.core.client.rfile.RFileSource; import org.apache.accumulo.core.client.sample.SamplerConfiguration; import org.apache.accumulo.core.client.summary.SummarizerConfiguration; import org.apache.accumulo.core.client.summary.Summary; @@ -47,14 +50,23 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.TabletIdImpl; +import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.metadata.CompactableFileImpl; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.spi.common.ServiceEnvironment; import org.apache.accumulo.core.spi.compaction.CompactionDispatcher; +import org.apache.accumulo.core.summary.Gatherer; +import org.apache.accumulo.core.summary.SummarizerFactory; +import org.apache.accumulo.core.summary.SummaryCollection; +import org.apache.accumulo.core.summary.SummaryReader; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.ServiceEnvironmentImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -127,7 +139,32 @@ public Collection getAvailableFiles() { public Collection getSummaries(Collection files, Predicate summarySelector) { - throw new UnsupportedOperationException(); + // ELASTICITY_TODO this may open files for user tables in the manager, need to avoid + // this. See #3526 + + try { + var tableConf = context.getTableConfiguration(extent.tableId()); + + SummaryCollection sc = new SummaryCollection(); + SummarizerFactory factory = new SummarizerFactory(tableConf); + for (CompactableFile cf : files) { + var file = CompactableFileImpl.toStoredTabletFile(cf); + FileSystem fs = context.getVolumeManager().getFileSystemByPath(file.getPath()); + Configuration conf = context.getHadoopConf(); + RFileSource source = new RFileSource(new FSDataInputStream(fs.open(file.getPath())), + fs.getFileStatus(file.getPath()).getLen(), file.getRange()); + + SummaryCollection fsc = SummaryReader + .load(conf, source, file.getFileName(), summarySelector, factory, + tableConf.getCryptoService()) + .getSummaries(Collections.singletonList(new Gatherer.RowRange(extent))); + + sc.merge(fsc, factory); + } + return sc.getSummaries(); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } } @Override @@ -141,9 +178,30 @@ public TabletId getTabletId() { } @Override - public Optional> getSample(CompactableFile file, + public Optional> getSample(CompactableFile cf, SamplerConfiguration sc) { - throw new UnsupportedOperationException(); + + // ELASTICITY_TODO this may open files for user tables in the manager, need to avoid + // this. See #3526 + + try { + var file = CompactableFileImpl.toStoredTabletFile(cf); + FileSystem fs = context.getVolumeManager().getFileSystemByPath(file.getPath()); + Configuration conf = context.getHadoopConf(); + var tableConf = context.getTableConfiguration(extent.tableId()); + var iter = FileOperations.getInstance().newReaderBuilder() + .forFile(file, fs, conf, tableConf.getCryptoService()) + .withTableConfiguration(tableConf).seekToBeginning().build(); + var sampleIter = iter.getSample(new SamplerConfigurationImpl(sc)); + if (sampleIter == null) { + iter.close(); + return Optional.empty(); + } + + return Optional.of(sampleIter); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } } });