From 77163aa8db42dd86fc01ca5363506183d48e964d Mon Sep 17 00:00:00 2001 From: Kevin Rathbun Date: Wed, 21 Aug 2024 09:54:25 -0400 Subject: [PATCH 1/5] `IteratorEnvironment` fixes: closes #4810 - Fixed some inconsistencies and bugs in (non-test) implementations of `IteratorEnvironment` - Fixed `RFileScanner`s unexpected `UnsupportedOperationException` when calling `env.getServiceEnv()` or `env.getConfig()` (both deprecated) or `env.getPluginEnv()` - Changed the impls to return the same object for calls to `getServiceEnv()`/`getPluginEnv()` (expected by `IteratorEnvIT`) - Fixed inconsistencies in functionality of `isUserCompaction()` and `isFullMajorCompaction()` when it is not a major compaction. Added missing javadoc to `isUserCompaction()` - Added tests for these environments to `IteratorEnvIT` --- .../client/ClientSideIteratorScanner.java | 27 ++++-- .../core/client/rfile/RFileScanner.java | 65 ++++++++++++- .../core/clientImpl/OfflineIterator.java | 19 ++-- .../core/iterators/IteratorEnvironment.java | 3 +- .../apache/accumulo/test/IteratorEnvIT.java | 91 +++++++++++++++++-- 5 files changed, 180 insertions(+), 25 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java b/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java index 362c4e85d62..4fe09b7881f 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java @@ -37,6 +37,8 @@ import org.apache.accumulo.core.clientImpl.ClientServiceEnvironmentImpl; import org.apache.accumulo.core.clientImpl.ScannerImpl; import org.apache.accumulo.core.clientImpl.ScannerOptions; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.data.ArrayByteSequence; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Column; @@ -55,6 +57,8 @@ import org.apache.accumulo.core.spi.common.ServiceEnvironment; import org.apache.hadoop.io.Text; +import com.google.common.base.Suppliers; + /** * A scanner that instantiates iterators on the client side instead of on the tablet server. This * can be useful for testing iterators or in cases where you don't want iterators affecting the @@ -91,10 +95,13 @@ private class ClientSideIteratorEnvironment implements IteratorEnvironment { private SamplerConfiguration samplerConfig; private boolean sampleEnabled; + private final Supplier serviceEnvironment; ClientSideIteratorEnvironment(boolean sampleEnabled, SamplerConfiguration samplerConfig) { this.sampleEnabled = sampleEnabled; this.samplerConfig = samplerConfig; + this.serviceEnvironment = + Suppliers.memoize(() -> new ClientServiceEnvironmentImpl(context.get())); } @Override @@ -111,7 +118,8 @@ public boolean isFullMajorCompaction() { @Override public boolean isUserCompaction() { - return false; + throw new IllegalStateException( + "Asked about user initiated compaction type when scope is " + getIteratorScope()); } @Override @@ -134,15 +142,22 @@ public SamplerConfiguration getSamplerConfiguration() { return samplerConfig; } - @Deprecated(since = "2.1.0") @Override - public ServiceEnvironment getServiceEnv() { - return new ClientServiceEnvironmentImpl(context.get()); + @Deprecated(since = "2.0.0") + public AccumuloConfiguration getConfig() { + var ctx = context.get(); + try { + return new ConfigurationCopy( + ctx.tableOperations().getConfiguration(ctx.getTableName(tableId.get()))); + } catch (AccumuloException | TableNotFoundException e) { + throw new RuntimeException("Error getting table configuration", e); + } } + @Deprecated(since = "2.1.0") @Override - public PluginEnvironment getPluginEnv() { - return new ClientServiceEnvironmentImpl(context.get()); + public ServiceEnvironment getServiceEnv() { + return serviceEnvironment.get(); } @Override diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java index 767ebc3ffae..429528caa87 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java @@ -44,6 +44,7 @@ import org.apache.accumulo.core.data.Column; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration; import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheManagerFactory; @@ -66,13 +67,16 @@ import org.apache.accumulo.core.spi.cache.BlockCacheManager; import org.apache.accumulo.core.spi.cache.CacheEntry; import org.apache.accumulo.core.spi.cache.CacheType; +import org.apache.accumulo.core.spi.common.ServiceEnvironment; import org.apache.accumulo.core.spi.crypto.CryptoEnvironment; import org.apache.accumulo.core.spi.crypto.CryptoService; +import org.apache.accumulo.core.util.ConfigurationImpl; import org.apache.accumulo.core.util.LocalityGroupUtil; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.io.Text; import com.google.common.base.Preconditions; +import com.google.common.base.Suppliers; class RFileScanner extends ScannerOptions implements Scanner { @@ -311,6 +315,12 @@ public void updateScanIteratorOption(String iteratorName, String key, String val } private class IterEnv implements IteratorEnvironment { + private final Supplier serviceEnvironment; + + private IterEnv() { + this.serviceEnvironment = Suppliers.memoize(this::createServiceEnv); + } + @Override public IteratorScope getIteratorScope() { return IteratorScope.scan; @@ -318,7 +328,14 @@ public IteratorScope getIteratorScope() { @Override public boolean isFullMajorCompaction() { - return false; + throw new IllegalStateException( + "Asked about major compaction type when scope is " + getIteratorScope()); + } + + @Override + public boolean isUserCompaction() { + throw new IllegalStateException( + "Asked about user initiated compaction type when scope is " + getIteratorScope()); } @Override @@ -335,6 +352,52 @@ public boolean isSamplingEnabled() { public SamplerConfiguration getSamplerConfiguration() { return RFileScanner.this.getSamplerConfiguration(); } + + @Override + public TableId getTableId() { + return null; + } + + @Override + @Deprecated(since = "2.0.0") + public AccumuloConfiguration getConfig() { + return tableConf; + } + + @Override + @Deprecated(since = "2.1.0") + public ServiceEnvironment getServiceEnv() { + return serviceEnvironment.get(); + } + + private ServiceEnvironment createServiceEnv() { + return new ServiceEnvironment() { + @Override + public T instantiate(TableId tableId, String className, Class base) { + throw new UnsupportedOperationException(); + } + + @Override + public T instantiate(String className, Class base) { + throw new UnsupportedOperationException(); + } + + @Override + public String getTableName(TableId tableId) { + throw new UnsupportedOperationException(); + } + + @Override + public Configuration getConfiguration(TableId tableId) { + return new ConfigurationImpl(tableConf); + } + + @Override + public Configuration getConfiguration() { + throw new UnsupportedOperationException(); + } + }; + } } @Override diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java index a03cc811ab8..c84930c6744 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java @@ -31,10 +31,10 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Map.Entry; +import java.util.function.Supplier; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.PluginEnvironment; import org.apache.accumulo.core.client.SampleNotPresentException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.sample.SamplerConfiguration; @@ -73,6 +73,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.Text; +import com.google.common.base.Suppliers; + class OfflineIterator implements Iterator> { static class OfflineIteratorEnvironment implements IteratorEnvironment { @@ -83,6 +85,7 @@ static class OfflineIteratorEnvironment implements IteratorEnvironment { private final SamplerConfiguration sampleConf; private final ClientContext context; private final TableId tableId; + private final Supplier serviceEnvironment; public OfflineIteratorEnvironment(ClientContext context, TableId tableId, Authorizations auths, AccumuloConfiguration acuTableConf, boolean useSample, SamplerConfiguration samplerConf) { @@ -92,6 +95,7 @@ public OfflineIteratorEnvironment(ClientContext context, TableId tableId, Author this.conf = acuTableConf; this.useSample = useSample; this.sampleConf = samplerConf; + this.serviceEnvironment = Suppliers.memoize(() -> new ClientServiceEnvironmentImpl(context)); } @Deprecated(since = "2.0.0") @@ -107,12 +111,14 @@ public IteratorScope getIteratorScope() { @Override public boolean isFullMajorCompaction() { - return false; + throw new IllegalStateException( + "Asked about major compaction type when scope is " + getIteratorScope()); } @Override public boolean isUserCompaction() { - return false; + throw new IllegalStateException( + "Asked about user initiated compaction type when scope is " + getIteratorScope()); } private final ArrayList> topLevelIterators = @@ -160,12 +166,7 @@ public IteratorEnvironment cloneWithSamplingEnabled() { @Deprecated(since = "2.1.0") @Override public ServiceEnvironment getServiceEnv() { - return new ClientServiceEnvironmentImpl(context); - } - - @Override - public PluginEnvironment getPluginEnv() { - return new ClientServiceEnvironmentImpl(context); + return serviceEnvironment.get(); } @Override diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java b/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java index 372a0e49a30..5e53062f283 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java @@ -140,7 +140,8 @@ default SamplerConfiguration getSamplerConfiguration() { } /** - * True if compaction was user initiated. + * True if compaction was user initiated. Will throw IllegalStateException if + * {@link #getIteratorScope()} != {@link IteratorScope#majc}. * * @since 2.0.0 */ diff --git a/test/src/main/java/org/apache/accumulo/test/IteratorEnvIT.java b/test/src/main/java/org/apache/accumulo/test/IteratorEnvIT.java index 05cc7a3a601..bd46a32b411 100644 --- a/test/src/main/java/org/apache/accumulo/test/IteratorEnvIT.java +++ b/test/src/main/java/org/apache/accumulo/test/IteratorEnvIT.java @@ -19,22 +19,28 @@ package org.apache.accumulo.test; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import java.io.IOException; import java.time.Duration; +import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; import java.util.Iterator; import java.util.Map; +import java.util.Objects; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.ClientSideIteratorScanner; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.PluginEnvironment; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.rfile.RFile; +import org.apache.accumulo.core.clientImpl.OfflineScanner; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.TableId; @@ -43,10 +49,15 @@ import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iterators.WrappingIterator; +import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.spi.common.ServiceEnvironment; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.functional.FunctionalTestUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -147,7 +158,9 @@ public void init(SortedKeyValueIterator source, Map op */ private static void testEnv(IteratorScope scope, Map opts, IteratorEnvironment env) { - TableId expectedTableId = TableId.of(opts.get("expected.table.id")); + // In some cases, a table id won't be provided (e.g., testing the env of RFileScanner) + String tableIdStr = opts.get("expected.table.id"); + TableId expectedTableId = tableIdStr != null ? TableId.of(tableIdStr) : null; // verify getServiceEnv() and getPluginEnv() are the same objects, // so further checks only need to use getPluginEnv() @@ -172,9 +185,14 @@ private static void testEnv(IteratorScope scope, Map opts, if (!"value1".equals(tableConf.getTableCustom("iterator.env.test"))) { throw new RuntimeException("Test failed - Expected table property not found in table conf."); } - var systemConf = pluginEnv.getConfiguration(); - if (systemConf.get("table.custom.iterator.env.test") != null) { - throw new RuntimeException("Test failed - Unexpected table property found in system conf."); + if (!env.getClass().getName().contains("RFileScanner$IterEnv")) { + var systemConf = pluginEnv.getConfiguration(); + if (systemConf.get("table.custom.iterator.env.test") != null) { + throw new RuntimeException("Test failed - Unexpected table property found in system conf."); + } + } else { + // We expect RFileScanner's IterEnv to throw an UOE + assertThrows(UnsupportedOperationException.class, pluginEnv::getConfiguration); } // check other environment settings @@ -184,7 +202,7 @@ private static void testEnv(IteratorScope scope, Map opts, if (env.isSamplingEnabled()) { throw new RuntimeException("Test failed - isSamplingEnabled returned true, expected false"); } - if (!expectedTableId.equals(env.getTableId())) { + if (!Objects.equals(expectedTableId, env.getTableId())) { throw new RuntimeException("Test failed - Error getting Table ID"); } } @@ -203,10 +221,13 @@ public void finish() { @Test public void test() throws Exception { - String[] tables = getUniqueNames(3); + String[] tables = getUniqueNames(5); testScan(tables[0], ScanIter.class); - testCompact(tables[1], MajcIter.class); - testMinCompact(tables[2], MincIter.class); + testRFileScan(ScanIter.class); + testOfflineScan(tables[1], ScanIter.class); + testClientSideScan(tables[2], ScanIter.class); + testCompact(tables[3], MajcIter.class); + testMinCompact(tables[4], MincIter.class); } private void testScan(String tableName, @@ -222,6 +243,49 @@ private void testScan(String tableName, } } + private void testRFileScan(Class> iteratorClass) + throws Exception { + LocalFileSystem fs = FileSystem.getLocal(new Configuration()); + String rFilePath = createRFile(fs); + IteratorSetting is = new IteratorSetting(1, iteratorClass); + + try (Scanner scanner = RFile.newScanner().from(rFilePath).withFileSystem(fs) + .withTableProperties(getTableConfig().getProperties()).build()) { + scanner.addScanIterator(is); + var unused = scanner.iterator(); + } + } + + public void testOfflineScan(String tableName, + Class> iteratorClass) throws Exception { + writeData(tableName); + TableId tableId = getServerContext().getTableId(tableName); + getServerContext().tableOperations().offline(tableName, true); + IteratorSetting is = new IteratorSetting(1, iteratorClass); + is.addOption("expected.table.id", + getServerContext().tableOperations().tableIdMap().get(tableName)); + + try (OfflineScanner scanner = + new OfflineScanner(getServerContext(), tableId, new Authorizations())) { + scanner.addScanIterator(is); + var unused = scanner.iterator(); + } + } + + public void testClientSideScan(String tableName, + Class> iteratorClass) throws Exception { + writeData(tableName); + IteratorSetting is = new IteratorSetting(1, iteratorClass); + is.addOption("expected.table.id", + getServerContext().tableOperations().tableIdMap().get(tableName)); + + try (Scanner scanner = client.createScanner(tableName); + var clientIterScanner = new ClientSideIteratorScanner(scanner)) { + clientIterScanner.addScanIterator(is); + var unused = clientIterScanner.iterator(); + } + } + public void testCompact(String tableName, Class> iteratorClass) throws Exception { writeData(tableName); @@ -266,4 +330,15 @@ private void writeData(String tableName) throws Exception { bw.addMutation(m); } } + + private String createRFile(FileSystem fs) throws Exception { + Path dir = new Path(System.getProperty("user.dir") + "/target/rfilescan-iterenv-test/testrf"); + + FunctionalTestUtils.createRFiles(client, fs, dir.toString(), 1, 1, 1); + fs.deleteOnExit(dir); + + var listStatus = fs.listStatus(dir); + assertEquals(1, listStatus.length); + return Arrays.stream(fs.listStatus(dir)).findFirst().orElseThrow().getPath().toString(); + } } From e1ccea2c8f7fbb860239bbea13391521ba2de0de Mon Sep 17 00:00:00 2001 From: Kevin Rathbun Date: Thu, 22 Aug 2024 10:01:43 -0400 Subject: [PATCH 2/5] Review changes: - Updated RFile javadocs - Added description of new supported functionality for `RFile.ScannerOptions.withTableProperties()` - Moved `@see` tag in javadoc for `RFile.{ScannerOptions/SummaryOptions}.withTableProperties(Map)` which was incorrect - Changed `RFileScanner.IterEnv.getTableId()` to return a dummy table id to be used for calling `RFileScanner.IterEnv.getConfiguration(TableId)` - Added functionality for `RFileScanner.IterEnv.instantiate()` - Added error msg to invalid uses of `RFileScanner.IterEnv.getServiceEnv()/getPluginEnv()` methods --- .../accumulo/core/client/rfile/RFile.java | 27 ++++++++++++++----- .../core/client/rfile/RFileScanner.java | 24 ++++++++++++----- 2 files changed, 38 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java index 3b6d10aade1..a3fce00046b 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java @@ -25,6 +25,7 @@ import java.util.Map.Entry; import java.util.function.Predicate; +import org.apache.accumulo.core.client.PluginEnvironment; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.client.sample.SamplerConfiguration; @@ -35,6 +36,8 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.security.Authorizations; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.Text; @@ -172,17 +175,27 @@ public interface ScannerOptions { * {@link Property#TABLE_PREFIX} may be accepted and used. For example, cache and crypto * properties could be passed here. * + *

+ * Starting with {@code 2.1.4}, {@link PluginEnvironment#getConfiguration(TableId)} (obtained by + * {@link IteratorEnvironment#getPluginEnv()}) will return the properties passed in here. + * * @param props iterable over Accumulo table key value properties. * @return this */ ScannerOptions withTableProperties(Iterable> props); /** - * @see #withTableProperties(Iterable) Any property that impacts file behavior regardless of - * whether it has the {@link Property#TABLE_PREFIX} may be accepted and used. For example, - * cache and crypto properties could be passed here. + * Any property that impacts file behavior regardless of whether it has the + * {@link Property#TABLE_PREFIX} may be accepted and used. For example, cache and crypto + * properties could be passed here. + * + *

+ * Starting with {@code 2.1.4}, {@link PluginEnvironment#getConfiguration(TableId)} (obtained by + * {@link IteratorEnvironment#getPluginEnv()}) will return the properties passed in here. + * * @param props a map instead of an Iterable * @return this + * @see #withTableProperties(Iterable) */ ScannerOptions withTableProperties(Map props); @@ -260,11 +273,13 @@ public interface SummaryOptions { SummaryOptions withTableProperties(Iterable> props); /** - * @see #withTableProperties(Iterable) Any property that impacts file behavior regardless of - * whether it has the {@link Property#TABLE_PREFIX} may be accepted and used. For example, - * cache and crypto properties could be passed here. + * Any property that impacts file behavior regardless of whether it has the + * {@link Property#TABLE_PREFIX} may be accepted and used. For example, cache and crypto + * properties could be passed here. + * * @param props a map instead of an Iterable * @return this + * @see #withTableProperties(Iterable) */ SummaryOptions withTableProperties(Map props); diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java index 429528caa87..dc02681c051 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java @@ -82,6 +82,9 @@ class RFileScanner extends ScannerOptions implements Scanner { private static final byte[] EMPTY_BYTES = new byte[0]; private static final Range EMPTY_RANGE = new Range(); + private static final String errorMsg = + "This scanner is unrelated to any table or accumulo instance;" + + " it operates directly on files. Therefore, it can not support this operation."; private Range range; private BlockCacheManager blockCacheManager = null; @@ -92,6 +95,7 @@ class RFileScanner extends ScannerOptions implements Scanner { private long readaheadThreshold = 3; private AccumuloConfiguration tableConf; private CryptoService cryptoService; + private final TableId dummyTableId; static class Opts { InputArgs in; @@ -229,6 +233,7 @@ public void indexWeightChanged() {} } this.cryptoService = CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE, opts.tableConfig); + this.dummyTableId = TableId.of("RFileScannerFakeTableId"); } @Override @@ -355,7 +360,7 @@ public SamplerConfiguration getSamplerConfiguration() { @Override public TableId getTableId() { - return null; + return dummyTableId; } @Override @@ -373,28 +378,33 @@ public ServiceEnvironment getServiceEnv() { private ServiceEnvironment createServiceEnv() { return new ServiceEnvironment() { @Override - public T instantiate(TableId tableId, String className, Class base) { - throw new UnsupportedOperationException(); + public T instantiate(TableId tableId, String className, Class base) + throws ReflectiveOperationException { + return instantiate(className, base); } @Override - public T instantiate(String className, Class base) { - throw new UnsupportedOperationException(); + public T instantiate(String className, Class base) + throws ReflectiveOperationException { + return this.getClass().getClassLoader().loadClass(className).asSubclass(base) + .getDeclaredConstructor().newInstance(); } @Override public String getTableName(TableId tableId) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException(errorMsg); } @Override public Configuration getConfiguration(TableId tableId) { + Preconditions.checkArgument(tableId.equals(getTableId()), "Expected " + getTableId() + + " but got " + tableId + " when requesting the table config"); return new ConfigurationImpl(tableConf); } @Override public Configuration getConfiguration() { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException(errorMsg); } }; } From 85ae622ace31d1786a0f97878dd01d020d40e55b Mon Sep 17 00:00:00 2001 From: Kevin Rathbun Date: Fri, 23 Aug 2024 16:10:12 -0400 Subject: [PATCH 3/5] review changes: - Minor RFileScanner changes - Refactored IteratorEnvIT to ensure expected assertions are executed --- .../core/client/rfile/RFileScanner.java | 7 +- .../apache/accumulo/test/IteratorEnvIT.java | 170 ++++++++++++------ 2 files changed, 124 insertions(+), 53 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java index dc02681c051..e478b6b9af0 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java @@ -85,6 +85,7 @@ class RFileScanner extends ScannerOptions implements Scanner { private static final String errorMsg = "This scanner is unrelated to any table or accumulo instance;" + " it operates directly on files. Therefore, it can not support this operation."; + private static final TableId dummyTableId = TableId.of("RFileScannerFakeTableId"); private Range range; private BlockCacheManager blockCacheManager = null; @@ -95,7 +96,6 @@ class RFileScanner extends ScannerOptions implements Scanner { private long readaheadThreshold = 3; private AccumuloConfiguration tableConf; private CryptoService cryptoService; - private final TableId dummyTableId; static class Opts { InputArgs in; @@ -233,7 +233,6 @@ public void indexWeightChanged() {} } this.cryptoService = CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE, opts.tableConfig); - this.dummyTableId = TableId.of("RFileScannerFakeTableId"); } @Override @@ -392,7 +391,9 @@ public T instantiate(String className, Class base) @Override public String getTableName(TableId tableId) { - throw new UnsupportedOperationException(errorMsg); + Preconditions.checkArgument(tableId.equals(getTableId()), "Expected " + getTableId() + + " but got " + tableId + " when requesting the table config"); + return tableId.canonical(); } @Override diff --git a/test/src/main/java/org/apache/accumulo/test/IteratorEnvIT.java b/test/src/main/java/org/apache/accumulo/test/IteratorEnvIT.java index bd46a32b411..29b48100e94 100644 --- a/test/src/main/java/org/apache/accumulo/test/IteratorEnvIT.java +++ b/test/src/main/java/org/apache/accumulo/test/IteratorEnvIT.java @@ -20,15 +20,15 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import java.io.File; import java.io.IOException; import java.time.Duration; -import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; -import java.util.Iterator; import java.util.Map; -import java.util.Objects; +import java.util.TreeMap; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; @@ -40,20 +40,20 @@ import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.rfile.RFile; +import org.apache.accumulo.core.client.rfile.RFileWriter; import org.apache.accumulo.core.clientImpl.OfflineScanner; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.Filter; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; -import org.apache.accumulo.core.iterators.WrappingIterator; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.spi.common.ServiceEnvironment; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; -import org.apache.accumulo.test.functional.FunctionalTestUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; @@ -82,13 +82,15 @@ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoo /** * Basic scan iterator to test IteratorEnvironment returns what is expected. */ - public static class ScanIter extends WrappingIterator { + public static class ScanIter extends Filter { IteratorScope scope = IteratorScope.scan; + String badColFam; @Override public void init(SortedKeyValueIterator source, Map options, IteratorEnvironment env) throws IOException { super.init(source, options, env); + this.badColFam = options.get("bad.col.fam"); testEnv(scope, options, env); // Checking for compaction on a scan should throw an error. @@ -103,18 +105,27 @@ public void init(SortedKeyValueIterator source, Map op "Test failed - Expected to throw IllegalStateException when checking compaction on a scan."); } catch (IllegalStateException e) {} } + + @Override + public boolean accept(Key k, Value v) { + // The only reason for filtering out some data is as a way to verify init() and testEnv() + // have been called + return !k.getColumnFamily().toString().equals(badColFam); + } } /** * Basic compaction iterator to test IteratorEnvironment returns what is expected. */ - public static class MajcIter extends WrappingIterator { + public static class MajcIter extends Filter { IteratorScope scope = IteratorScope.majc; + String badColFam; @Override public void init(SortedKeyValueIterator source, Map options, IteratorEnvironment env) throws IOException { super.init(source, options, env); + this.badColFam = options.get("bad.col.fam"); testEnv(scope, options, env); try { env.isUserCompaction(); @@ -127,18 +138,27 @@ public void init(SortedKeyValueIterator source, Map op throw new RuntimeException("Test failed"); } } + + @Override + public boolean accept(Key k, Value v) { + // The only reason for filtering out some data is as a way to verify init() and testEnv() + // have been called + return !k.getColumnFamily().toString().equals(badColFam); + } } /** * */ - public static class MincIter extends WrappingIterator { + public static class MincIter extends Filter { IteratorScope scope = IteratorScope.minc; + String badColFam; @Override public void init(SortedKeyValueIterator source, Map options, IteratorEnvironment env) throws IOException { super.init(source, options, env); + this.badColFam = options.get("bad.col.fam"); testEnv(scope, options, env); try { env.isUserCompaction(); @@ -151,6 +171,13 @@ public void init(SortedKeyValueIterator source, Map op "Test failed - Expected to throw IllegalStateException when checking compaction on a scan."); } catch (IllegalStateException e) {} } + + @Override + public boolean accept(Key k, Value v) { + // The only reason for filtering out some data is as a way to verify init() and testEnv() + // have been called + return !k.getColumnFamily().toString().equals(badColFam); + } } /** @@ -158,9 +185,7 @@ public void init(SortedKeyValueIterator source, Map op */ private static void testEnv(IteratorScope scope, Map opts, IteratorEnvironment env) { - // In some cases, a table id won't be provided (e.g., testing the env of RFileScanner) - String tableIdStr = opts.get("expected.table.id"); - TableId expectedTableId = tableIdStr != null ? TableId.of(tableIdStr) : null; + TableId expectedTableId = TableId.of(opts.get("expected.table.id")); // verify getServiceEnv() and getPluginEnv() are the same objects, // so further checks only need to use getPluginEnv() @@ -202,7 +227,7 @@ private static void testEnv(IteratorScope scope, Map opts, if (env.isSamplingEnabled()) { throw new RuntimeException("Test failed - isSamplingEnabled returned true, expected false"); } - if (!Objects.equals(expectedTableId, env.getTableId())) { + if (!expectedTableId.equals(env.getTableId())) { throw new RuntimeException("Test failed - Error getting Table ID"); } } @@ -223,7 +248,7 @@ public void finish() { public void test() throws Exception { String[] tables = getUniqueNames(5); testScan(tables[0], ScanIter.class); - testRFileScan(ScanIter.class); + testRFileScan("RFileScannerFakeTableId", ScanIter.class); testOfflineScan(tables[1], ScanIter.class); testClientSideScan(tables[2], ScanIter.class); testCompact(tables[3], MajcIter.class); @@ -236,53 +261,63 @@ private void testScan(String tableName, IteratorSetting cfg = new IteratorSetting(1, iteratorClass); cfg.addOption("expected.table.id", client.tableOperations().tableIdMap().get(tableName)); + cfg.addOption("bad.col.fam", "badcf"); + try (Scanner scan = client.createScanner(tableName)) { scan.addScanIterator(cfg); - Iterator> iter = scan.iterator(); - iter.forEachRemaining(e -> assertEquals("cf1", e.getKey().getColumnFamily().toString())); + validateScanner(scan); } } - private void testRFileScan(Class> iteratorClass) - throws Exception { + private void testRFileScan(String tableName, + Class> iteratorClass) throws Exception { + TreeMap data = createTestData(); LocalFileSystem fs = FileSystem.getLocal(new Configuration()); - String rFilePath = createRFile(fs); - IteratorSetting is = new IteratorSetting(1, iteratorClass); + String rFilePath = createRFile(fs, data); - try (Scanner scanner = RFile.newScanner().from(rFilePath).withFileSystem(fs) + IteratorSetting cfg = new IteratorSetting(1, iteratorClass); + cfg.addOption("expected.table.id", tableName); + cfg.addOption("bad.col.fam", "badcf"); + + try (Scanner scan = RFile.newScanner().from(rFilePath).withFileSystem(fs) .withTableProperties(getTableConfig().getProperties()).build()) { - scanner.addScanIterator(is); - var unused = scanner.iterator(); + scan.addScanIterator(cfg); + validateScanner(scan); } } public void testOfflineScan(String tableName, Class> iteratorClass) throws Exception { writeData(tableName); + TableId tableId = getServerContext().getTableId(tableName); getServerContext().tableOperations().offline(tableName, true); - IteratorSetting is = new IteratorSetting(1, iteratorClass); - is.addOption("expected.table.id", + + IteratorSetting cfg = new IteratorSetting(1, iteratorClass); + cfg.addOption("expected.table.id", getServerContext().tableOperations().tableIdMap().get(tableName)); + cfg.addOption("bad.col.fam", "badcf"); - try (OfflineScanner scanner = + try (OfflineScanner scan = new OfflineScanner(getServerContext(), tableId, new Authorizations())) { - scanner.addScanIterator(is); - var unused = scanner.iterator(); + scan.addScanIterator(cfg); + validateScanner(scan); } } public void testClientSideScan(String tableName, Class> iteratorClass) throws Exception { writeData(tableName); - IteratorSetting is = new IteratorSetting(1, iteratorClass); - is.addOption("expected.table.id", + + IteratorSetting cfg = new IteratorSetting(1, iteratorClass); + cfg.addOption("expected.table.id", getServerContext().tableOperations().tableIdMap().get(tableName)); + cfg.addOption("bad.col.fam", "badcf"); - try (Scanner scanner = client.createScanner(tableName); - var clientIterScanner = new ClientSideIteratorScanner(scanner)) { - clientIterScanner.addScanIterator(is); - var unused = clientIterScanner.iterator(); + try (Scanner scan = client.createScanner(tableName); + var clientIterScan = new ClientSideIteratorScanner(scan)) { + clientIterScan.addScanIterator(cfg); + validateScanner(clientIterScan); } } @@ -292,9 +327,14 @@ public void testCompact(String tableName, IteratorSetting cfg = new IteratorSetting(1, iteratorClass); cfg.addOption("expected.table.id", client.tableOperations().tableIdMap().get(tableName)); + cfg.addOption("bad.col.fam", "badcf"); CompactionConfig config = new CompactionConfig(); config.setIterators(Collections.singletonList(cfg)); client.tableOperations().compact(tableName, config); + + try (Scanner scan = client.createScanner(tableName)) { + validateScanner(scan); + } } public void testMinCompact(String tableName, @@ -303,10 +343,15 @@ public void testMinCompact(String tableName, IteratorSetting cfg = new IteratorSetting(1, iteratorClass); cfg.addOption("expected.table.id", client.tableOperations().tableIdMap().get(tableName)); + cfg.addOption("bad.col.fam", "badcf"); client.tableOperations().attachIterator(tableName, cfg, EnumSet.of(IteratorScope.minc)); - client.tableOperations().flush(tableName); + client.tableOperations().flush(tableName, null, null, true); + + try (Scanner scan = client.createScanner(tableName)) { + validateScanner(scan); + } } private NewTableConfiguration getTableConfig() { @@ -319,26 +364,51 @@ private void writeData(String tableName) throws Exception { client.tableOperations().create(tableName, getTableConfig()); try (BatchWriter bw = client.createBatchWriter(tableName)) { - Mutation m = new Mutation("row1"); - m.at().family("cf1").qualifier("cq1").put("val1"); - bw.addMutation(m); - m = new Mutation("row2"); - m.at().family("cf1").qualifier("cq1").put("val2"); - bw.addMutation(m); - m = new Mutation("row3"); - m.at().family("cf1").qualifier("cq1").put("val3"); - bw.addMutation(m); + for (Map.Entry data : createTestData().entrySet()) { + Mutation m = new Mutation(data.getKey().getRow()); + m.at().family(data.getKey().getColumnFamily()).qualifier(data.getKey().getColumnQualifier()) + .put(data.getValue()); + bw.addMutation(m); + } } } - private String createRFile(FileSystem fs) throws Exception { - Path dir = new Path(System.getProperty("user.dir") + "/target/rfilescan-iterenv-test/testrf"); + private TreeMap createTestData() { + TreeMap testData = new TreeMap<>(); - FunctionalTestUtils.createRFiles(client, fs, dir.toString(), 1, 1, 1); - fs.deleteOnExit(dir); + // Write data that we do not expect to be filtered out + testData.put(new Key("row1", "cf1", "cq1"), new Value("val1")); + testData.put(new Key("row2", "cf1", "cq1"), new Value("val2")); + testData.put(new Key("row3", "cf1", "cq1"), new Value("val3")); + // Write data that we expect to be filtered out + testData.put(new Key("row4", "badcf", "badcq"), new Value("val1")); + testData.put(new Key("row5", "badcf", "badcq"), new Value("val2")); + testData.put(new Key("row6", "badcf", "badcq"), new Value("val3")); - var listStatus = fs.listStatus(dir); - assertEquals(1, listStatus.length); - return Arrays.stream(fs.listStatus(dir)).findFirst().orElseThrow().getPath().toString(); + return testData; + } + + private String createRFile(FileSystem fs, TreeMap data) throws Exception { + File dir = new File(System.getProperty("user.dir") + "/target/rfilescan-iterenv-test"); + assertTrue(dir.mkdirs()); + String filePath = dir.getAbsolutePath() + "/test.rf"; + + try (RFileWriter writer = RFile.newWriter().to(filePath).withFileSystem(fs).build()) { + writer.append(data.entrySet()); + } + + fs.deleteOnExit(new Path(dir.getAbsolutePath())); + + return filePath; + } + + private void validateScanner(Scanner scan) { + // Ensure the badcf was filtered out to ensure init() and testEnv() were called + int numElts = 0; + for (var e : scan) { + numElts++; + assertEquals("cf1", e.getKey().getColumnFamily().toString()); + } + assertEquals(3, numElts); } } From 3660d3ace0c47c29fa97d5c8801be9e9ff92a265 Mon Sep 17 00:00:00 2001 From: Kevin Rathbun Date: Tue, 27 Aug 2024 09:42:09 -0400 Subject: [PATCH 4/5] IteratorEnvironment javadoc fixes --- .../core/iterators/IteratorEnvironment.java | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java b/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java index 5e53062f283..114e731173b 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java @@ -43,7 +43,7 @@ default SortedKeyValueIterator reserveMapFileReader(String mapFileNam } /** - * @deprecated since 2.0.0. This method was using an unstable non public type. Use + * @deprecated since 2.0.0. This method was using an unstable, non-public type. Use * {@link #getPluginEnv()} */ @Deprecated(since = "2.0.0") @@ -52,16 +52,16 @@ default AccumuloConfiguration getConfig() { } /** - * Return the executed scope of the Iterator. Value will be one of the following: - * {@link IteratorScope#scan}, {@link IteratorScope#minc}, {@link IteratorScope#majc} + * @return the executed scope of the Iterator. Value will be one of the following: + * {@link IteratorScope#scan}, {@link IteratorScope#minc}, {@link IteratorScope#majc} */ default IteratorScope getIteratorScope() { throw new UnsupportedOperationException(); } /** - * Return true if the compaction is a full major compaction. Will throw IllegalStateException if - * {@link #getIteratorScope()} != {@link IteratorScope#majc}. + * @return true if the compaction is a full major compaction; false otherwise + * @throws IllegalStateException if {@link #getIteratorScope()} != {@link IteratorScope#majc}. */ default boolean isFullMajorCompaction() { throw new UnsupportedOperationException(); @@ -76,8 +76,9 @@ default void registerSideChannel(SortedKeyValueIterator iter) { } /** - * Return the Scan Authorizations used in this Iterator. Will throw UnsupportedOperationException - * if {@link #getIteratorScope()} != {@link IteratorScope#scan}. + * @return the Scan Authorizations used in this Iterator. + * @throws UnsupportedOperationException if {@link #getIteratorScope()} != + * {@link IteratorScope#scan}. */ default Authorizations getAuthorizations() { throw new UnsupportedOperationException(); @@ -123,7 +124,7 @@ default IteratorEnvironment cloneWithSamplingEnabled() { * is for a deep copy created with an environment created by calling * {@link #cloneWithSamplingEnabled()} * - * @return true if sampling is enabled for this environment. + * @return true if sampling is enabled for this environment; false otherwise * @since 1.8.0 */ default boolean isSamplingEnabled() { @@ -132,7 +133,7 @@ default boolean isSamplingEnabled() { /** * - * @return sampling configuration is sampling is enabled for environment, otherwise returns null. + * @return sampling configuration if sampling is enabled for environment, otherwise null. * @since 1.8.0 */ default SamplerConfiguration getSamplerConfiguration() { @@ -140,8 +141,8 @@ default SamplerConfiguration getSamplerConfiguration() { } /** - * True if compaction was user initiated. Will throw IllegalStateException if - * {@link #getIteratorScope()} != {@link IteratorScope#majc}. + * @return true if compaction was user initiated; false otherwise + * @throws IllegalStateException if {@link #getIteratorScope()} != {@link IteratorScope#majc}. * * @since 2.0.0 */ @@ -158,7 +159,7 @@ default boolean isUserCompaction() { * * * @since 2.0.0 - * @deprecated since 2.1.0. This method was using a non public API type. Use + * @deprecated since 2.1.0. This method was using a non-public API type. Use * {@link #getPluginEnv()} instead because it has better stability guarantees. */ @Deprecated(since = "2.1.0") @@ -181,7 +182,7 @@ default PluginEnvironment getPluginEnv() { } /** - * Return the table Id associated with this iterator. + * @return the table Id associated with this iterator. * * @since 2.0.0 */ From a68ec11693a57a09c36eecea2cdb70c2c18acfc9 Mon Sep 17 00:00:00 2001 From: Kevin Rathbun Date: Thu, 12 Sep 2024 16:01:14 -0400 Subject: [PATCH 5/5] Changes: - Changed ClientSideIteratorScanner.getConfig() to use the safer getPluginEnv() - Added getPluginEnv() impl to all IteratorEnvironments (besides test envs) - Improved RFile.ScannerOptions.withTableProperties() javadoc to better explain how iterators will have access to these properties - Opted for RFileScanner.IterEnv.getTableId() to return null instead of a dummy id - Removed incorrect impl of RFileScanner.IterEnv.get(Service/Plugin)Env().getTableName() to instead just throw UnsupOpExc since the method doesn't make sense anyways. Co-authored-by: Christopher Tubbs --- .../client/ClientSideIteratorScanner.java | 13 +++++----- .../accumulo/core/client/rfile/RFile.java | 10 +++---- .../core/client/rfile/RFileScanner.java | 26 +++++++++++++------ .../core/clientImpl/OfflineIterator.java | 6 +++++ .../iterators/TabletIteratorEnvironment.java | 6 +++++ .../apache/accumulo/test/IteratorEnvIT.java | 13 +++++----- 6 files changed, 47 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java b/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java index 6f31556a114..8da75b53c91 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java @@ -145,13 +145,7 @@ public SamplerConfiguration getSamplerConfiguration() { @Override @Deprecated(since = "2.0.0") public AccumuloConfiguration getConfig() { - var ctx = context.get(); - try { - return new ConfigurationCopy( - ctx.tableOperations().getConfiguration(ctx.getTableName(tableId.get()))); - } catch (AccumuloException | TableNotFoundException e) { - throw new RuntimeException("Error getting table configuration", e); - } + return new ConfigurationCopy(getPluginEnv().getConfiguration(getTableId())); } @Deprecated(since = "2.1.0") @@ -160,6 +154,11 @@ public ServiceEnvironment getServiceEnv() { return serviceEnvironment.get(); } + @Override + public PluginEnvironment getPluginEnv() { + return serviceEnvironment.get(); + } + @Override public TableId getTableId() { return tableId.get(); diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java index a3fce00046b..03feca5243b 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java @@ -176,8 +176,10 @@ public interface ScannerOptions { * properties could be passed here. * *

- * Starting with {@code 2.1.4}, {@link PluginEnvironment#getConfiguration(TableId)} (obtained by - * {@link IteratorEnvironment#getPluginEnv()}) will return the properties passed in here. + * Configured iterators will have access to these properties via the + * {@link PluginEnvironment#getConfiguration(TableId)} (obtained by + * {@link IteratorEnvironment#getPluginEnv()}). The tableId used to get the configuration should + * be the one returned programmatically from {@link IteratorEnvironment#getTableId()}. * * @param props iterable over Accumulo table key value properties. * @return this @@ -189,10 +191,6 @@ public interface ScannerOptions { * {@link Property#TABLE_PREFIX} may be accepted and used. For example, cache and crypto * properties could be passed here. * - *

- * Starting with {@code 2.1.4}, {@link PluginEnvironment#getConfiguration(TableId)} (obtained by - * {@link IteratorEnvironment#getPluginEnv()}) will return the properties passed in here. - * * @param props a map instead of an Iterable * @return this * @see #withTableProperties(Iterable) diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java index 822c192302a..f76285dfd3d 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java @@ -31,6 +31,7 @@ import java.util.function.Supplier; import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.PluginEnvironment; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.rfile.RFileScannerBuilder.InputArgs; import org.apache.accumulo.core.client.sample.SamplerConfiguration; @@ -85,8 +86,6 @@ class RFileScanner extends ScannerOptions implements Scanner { private static final String errorMsg = "This scanner is unrelated to any table or accumulo instance;" + " it operates directly on files. Therefore, it can not support this operation."; - private static final TableId dummyTableId = TableId.of("RFileScannerFakeTableId"); - private Range range; private BlockCacheManager blockCacheManager = null; private BlockCache dataCache = null; @@ -357,9 +356,16 @@ public SamplerConfiguration getSamplerConfiguration() { return RFileScanner.this.getSamplerConfiguration(); } + /** + * This method only exists to be used as described in {@link IteratorEnvironment#getPluginEnv()} + * so the table config can be obtained. This simply returns null since a table id does not make + * sense in the context of scanning RFiles, but is needed to obtain the table configuration. + * + * @return null + */ @Override public TableId getTableId() { - return dummyTableId; + return null; } @Override @@ -374,6 +380,11 @@ public ServiceEnvironment getServiceEnv() { return serviceEnvironment.get(); } + @Override + public PluginEnvironment getPluginEnv() { + return serviceEnvironment.get(); + } + private ServiceEnvironment createServiceEnv() { return new ServiceEnvironment() { @Override @@ -391,15 +402,14 @@ public T instantiate(String className, Class base) @Override public String getTableName(TableId tableId) { - Preconditions.checkArgument(tableId.equals(getTableId()), "Expected " + getTableId() - + " but got " + tableId + " when requesting the table config"); - return tableId.canonical(); + throw new UnsupportedOperationException(errorMsg); } @Override public Configuration getConfiguration(TableId tableId) { - Preconditions.checkArgument(tableId.equals(getTableId()), "Expected " + getTableId() - + " but got " + tableId + " when requesting the table config"); + Preconditions.checkArgument(tableId == getTableId(), + "Expected tableId obtained from IteratorEnvironment.getTableId() but got " + tableId + + " when requesting the table config"); return new ConfigurationImpl(tableConf); } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java index c84930c6744..ce1bb2d26e7 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java @@ -35,6 +35,7 @@ import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.PluginEnvironment; import org.apache.accumulo.core.client.SampleNotPresentException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.sample.SamplerConfiguration; @@ -169,6 +170,11 @@ public ServiceEnvironment getServiceEnv() { return serviceEnvironment.get(); } + @Override + public PluginEnvironment getPluginEnv() { + return serviceEnvironment.get(); + } + @Override public TableId getTableId() { return tableId; diff --git a/server/base/src/main/java/org/apache/accumulo/server/iterators/TabletIteratorEnvironment.java b/server/base/src/main/java/org/apache/accumulo/server/iterators/TabletIteratorEnvironment.java index 63970b2943a..d6522315d2f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/iterators/TabletIteratorEnvironment.java +++ b/server/base/src/main/java/org/apache/accumulo/server/iterators/TabletIteratorEnvironment.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.Map; +import org.apache.accumulo.core.client.PluginEnvironment; import org.apache.accumulo.core.client.SampleNotPresentException; import org.apache.accumulo.core.client.sample.SamplerConfiguration; import org.apache.accumulo.core.conf.AccumuloConfiguration; @@ -232,6 +233,11 @@ public ServiceEnvironment getServiceEnv() { return serviceEnvironment; } + @Override + public PluginEnvironment getPluginEnv() { + return serviceEnvironment; + } + @Override public TableId getTableId() { return tableId; diff --git a/test/src/main/java/org/apache/accumulo/test/IteratorEnvIT.java b/test/src/main/java/org/apache/accumulo/test/IteratorEnvIT.java index 29b48100e94..00e0ce25c38 100644 --- a/test/src/main/java/org/apache/accumulo/test/IteratorEnvIT.java +++ b/test/src/main/java/org/apache/accumulo/test/IteratorEnvIT.java @@ -185,7 +185,8 @@ public boolean accept(Key k, Value v) { */ private static void testEnv(IteratorScope scope, Map opts, IteratorEnvironment env) { - TableId expectedTableId = TableId.of(opts.get("expected.table.id")); + String expTableIdStr = opts.get("expected.table.id"); + TableId expTableId = expTableIdStr == null ? null : TableId.of(expTableIdStr); // verify getServiceEnv() and getPluginEnv() are the same objects, // so further checks only need to use getPluginEnv() @@ -227,7 +228,7 @@ private static void testEnv(IteratorScope scope, Map opts, if (env.isSamplingEnabled()) { throw new RuntimeException("Test failed - isSamplingEnabled returned true, expected false"); } - if (!expectedTableId.equals(env.getTableId())) { + if (expTableId != null && !expTableId.equals(env.getTableId())) { throw new RuntimeException("Test failed - Error getting Table ID"); } } @@ -248,7 +249,8 @@ public void finish() { public void test() throws Exception { String[] tables = getUniqueNames(5); testScan(tables[0], ScanIter.class); - testRFileScan("RFileScannerFakeTableId", ScanIter.class); + // No table id when scanning at file level + testRFileScan(ScanIter.class); testOfflineScan(tables[1], ScanIter.class); testClientSideScan(tables[2], ScanIter.class); testCompact(tables[3], MajcIter.class); @@ -269,14 +271,13 @@ private void testScan(String tableName, } } - private void testRFileScan(String tableName, - Class> iteratorClass) throws Exception { + private void testRFileScan(Class> iteratorClass) + throws Exception { TreeMap data = createTestData(); LocalFileSystem fs = FileSystem.getLocal(new Configuration()); String rFilePath = createRFile(fs, data); IteratorSetting cfg = new IteratorSetting(1, iteratorClass); - cfg.addOption("expected.table.id", tableName); cfg.addOption("bad.col.fam", "badcf"); try (Scanner scan = RFile.newScanner().from(rFilePath).withFileSystem(fs)