diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionPluginUtils.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionPluginUtils.java index 6f7529948fe..7978d3c2e5f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionPluginUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionPluginUtils.java @@ -18,6 +18,8 @@ */ package org.apache.accumulo.server.compaction; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -34,6 +36,7 @@ import org.apache.accumulo.core.client.admin.compaction.CompactableFile; import org.apache.accumulo.core.client.admin.compaction.CompactionConfigurer; import org.apache.accumulo.core.client.admin.compaction.CompactionSelector; +import org.apache.accumulo.core.client.rfile.RFileSource; import org.apache.accumulo.core.client.sample.SamplerConfiguration; import org.apache.accumulo.core.client.summary.SummarizerConfiguration; import org.apache.accumulo.core.client.summary.Summary; @@ -47,14 +50,23 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.TabletIdImpl; +import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.metadata.CompactableFileImpl; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.spi.common.ServiceEnvironment; import org.apache.accumulo.core.spi.compaction.CompactionDispatcher; +import org.apache.accumulo.core.summary.Gatherer; +import org.apache.accumulo.core.summary.SummarizerFactory; +import org.apache.accumulo.core.summary.SummaryCollection; +import org.apache.accumulo.core.summary.SummaryReader; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.ServiceEnvironmentImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -127,7 +139,32 @@ public Collection getAvailableFiles() { public Collection getSummaries(Collection files, Predicate summarySelector) { - throw new UnsupportedOperationException(); + // ELASTICITY_TODO this may open files for user tables in the manager, need to avoid + // this. See #3526 + + try { + var tableConf = context.getTableConfiguration(extent.tableId()); + + SummaryCollection sc = new SummaryCollection(); + SummarizerFactory factory = new SummarizerFactory(tableConf); + for (CompactableFile cf : files) { + var file = CompactableFileImpl.toStoredTabletFile(cf); + FileSystem fs = context.getVolumeManager().getFileSystemByPath(file.getPath()); + Configuration conf = context.getHadoopConf(); + RFileSource source = new RFileSource(new FSDataInputStream(fs.open(file.getPath())), + fs.getFileStatus(file.getPath()).getLen(), file.getRange()); + + SummaryCollection fsc = SummaryReader + .load(conf, source, file.getFileName(), summarySelector, factory, + tableConf.getCryptoService()) + .getSummaries(Collections.singletonList(new Gatherer.RowRange(extent))); + + sc.merge(fsc, factory); + } + return sc.getSummaries(); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } } @Override @@ -141,9 +178,30 @@ public TabletId getTabletId() { } @Override - public Optional> getSample(CompactableFile file, + public Optional> getSample(CompactableFile cf, SamplerConfiguration sc) { - throw new UnsupportedOperationException(); + + // ELASTICITY_TODO this may open files for user tables in the manager, need to avoid + // this. See #3526 + + try { + var file = CompactableFileImpl.toStoredTabletFile(cf); + FileSystem fs = context.getVolumeManager().getFileSystemByPath(file.getPath()); + Configuration conf = context.getHadoopConf(); + var tableConf = context.getTableConfiguration(extent.tableId()); + var iter = FileOperations.getInstance().newReaderBuilder() + .forFile(file, fs, conf, tableConf.getCryptoService()) + .withTableConfiguration(tableConf).seekToBeginning().build(); + var sampleIter = iter.getSample(new SamplerConfigurationImpl(sc)); + if (sampleIter == null) { + iter.close(); + return Optional.empty(); + } + + return Optional.of(sampleIter); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } } });