diff --git a/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java b/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java index 217a3bb31cd..8da75b53c91 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java @@ -37,6 +37,8 @@ import org.apache.accumulo.core.clientImpl.ClientServiceEnvironmentImpl; import org.apache.accumulo.core.clientImpl.ScannerImpl; import org.apache.accumulo.core.clientImpl.ScannerOptions; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.data.ArrayByteSequence; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Column; @@ -55,6 +57,8 @@ import org.apache.accumulo.core.spi.common.ServiceEnvironment; import org.apache.hadoop.io.Text; +import com.google.common.base.Suppliers; + /** * A scanner that instantiates iterators on the client side instead of on the tablet server. This * can be useful for testing iterators or in cases where you don't want iterators affecting the @@ -91,10 +95,13 @@ private class ClientSideIteratorEnvironment implements IteratorEnvironment { private final SamplerConfiguration samplerConfig; private final boolean sampleEnabled; + private final Supplier serviceEnvironment; ClientSideIteratorEnvironment(boolean sampleEnabled, SamplerConfiguration samplerConfig) { this.sampleEnabled = sampleEnabled; this.samplerConfig = samplerConfig; + this.serviceEnvironment = + Suppliers.memoize(() -> new ClientServiceEnvironmentImpl(context.get())); } @Override @@ -111,7 +118,8 @@ public boolean isFullMajorCompaction() { @Override public boolean isUserCompaction() { - return false; + throw new IllegalStateException( + "Asked about user initiated compaction type when scope is " + getIteratorScope()); } @Override @@ -134,15 +142,21 @@ public SamplerConfiguration getSamplerConfiguration() { return samplerConfig; } + @Override + @Deprecated(since = "2.0.0") + public AccumuloConfiguration getConfig() { + return new ConfigurationCopy(getPluginEnv().getConfiguration(getTableId())); + } + @Deprecated(since = "2.1.0") @Override public ServiceEnvironment getServiceEnv() { - return new ClientServiceEnvironmentImpl(context.get()); + return serviceEnvironment.get(); } @Override public PluginEnvironment getPluginEnv() { - return new ClientServiceEnvironmentImpl(context.get()); + return serviceEnvironment.get(); } @Override diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java index 3b6d10aade1..03feca5243b 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java @@ -25,6 +25,7 @@ import java.util.Map.Entry; import java.util.function.Predicate; +import org.apache.accumulo.core.client.PluginEnvironment; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.client.sample.SamplerConfiguration; @@ -35,6 +36,8 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.security.Authorizations; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.Text; @@ -172,17 +175,25 @@ public interface ScannerOptions { * {@link Property#TABLE_PREFIX} may be accepted and used. For example, cache and crypto * properties could be passed here. * + *

+ * Configured iterators will have access to these properties via the + * {@link PluginEnvironment#getConfiguration(TableId)} (obtained by + * {@link IteratorEnvironment#getPluginEnv()}). The tableId used to get the configuration should + * be the one returned programmatically from {@link IteratorEnvironment#getTableId()}. + * * @param props iterable over Accumulo table key value properties. * @return this */ ScannerOptions withTableProperties(Iterable> props); /** - * @see #withTableProperties(Iterable) Any property that impacts file behavior regardless of - * whether it has the {@link Property#TABLE_PREFIX} may be accepted and used. For example, - * cache and crypto properties could be passed here. + * Any property that impacts file behavior regardless of whether it has the + * {@link Property#TABLE_PREFIX} may be accepted and used. For example, cache and crypto + * properties could be passed here. + * * @param props a map instead of an Iterable * @return this + * @see #withTableProperties(Iterable) */ ScannerOptions withTableProperties(Map props); @@ -260,11 +271,13 @@ public interface SummaryOptions { SummaryOptions withTableProperties(Iterable> props); /** - * @see #withTableProperties(Iterable) Any property that impacts file behavior regardless of - * whether it has the {@link Property#TABLE_PREFIX} may be accepted and used. For example, - * cache and crypto properties could be passed here. + * Any property that impacts file behavior regardless of whether it has the + * {@link Property#TABLE_PREFIX} may be accepted and used. For example, cache and crypto + * properties could be passed here. + * * @param props a map instead of an Iterable * @return this + * @see #withTableProperties(Iterable) */ SummaryOptions withTableProperties(Map props); diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java index 36d58612d1d..f76285dfd3d 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java @@ -31,6 +31,7 @@ import java.util.function.Supplier; import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.PluginEnvironment; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.rfile.RFileScannerBuilder.InputArgs; import org.apache.accumulo.core.client.sample.SamplerConfiguration; @@ -44,6 +45,7 @@ import org.apache.accumulo.core.data.Column; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration; import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheManagerFactory; @@ -66,19 +68,24 @@ import org.apache.accumulo.core.spi.cache.BlockCacheManager; import org.apache.accumulo.core.spi.cache.CacheEntry; import org.apache.accumulo.core.spi.cache.CacheType; +import org.apache.accumulo.core.spi.common.ServiceEnvironment; import org.apache.accumulo.core.spi.crypto.CryptoEnvironment; import org.apache.accumulo.core.spi.crypto.CryptoService; +import org.apache.accumulo.core.util.ConfigurationImpl; import org.apache.accumulo.core.util.LocalityGroupUtil; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.io.Text; import com.google.common.base.Preconditions; +import com.google.common.base.Suppliers; class RFileScanner extends ScannerOptions implements Scanner { private static final byte[] EMPTY_BYTES = new byte[0]; private static final Range EMPTY_RANGE = new Range(); - + private static final String errorMsg = + "This scanner is unrelated to any table or accumulo instance;" + + " it operates directly on files. Therefore, it can not support this operation."; private Range range; private BlockCacheManager blockCacheManager = null; private BlockCache dataCache = null; @@ -311,6 +318,12 @@ public void updateScanIteratorOption(String iteratorName, String key, String val } private class IterEnv implements IteratorEnvironment { + private final Supplier serviceEnvironment; + + private IterEnv() { + this.serviceEnvironment = Suppliers.memoize(this::createServiceEnv); + } + @Override public IteratorScope getIteratorScope() { return IteratorScope.scan; @@ -318,7 +331,14 @@ public IteratorScope getIteratorScope() { @Override public boolean isFullMajorCompaction() { - return false; + throw new IllegalStateException( + "Asked about major compaction type when scope is " + getIteratorScope()); + } + + @Override + public boolean isUserCompaction() { + throw new IllegalStateException( + "Asked about user initiated compaction type when scope is " + getIteratorScope()); } @Override @@ -335,6 +355,70 @@ public boolean isSamplingEnabled() { public SamplerConfiguration getSamplerConfiguration() { return RFileScanner.this.getSamplerConfiguration(); } + + /** + * This method only exists to be used as described in {@link IteratorEnvironment#getPluginEnv()} + * so the table config can be obtained. This simply returns null since a table id does not make + * sense in the context of scanning RFiles, but is needed to obtain the table configuration. + * + * @return null + */ + @Override + public TableId getTableId() { + return null; + } + + @Override + @Deprecated(since = "2.0.0") + public AccumuloConfiguration getConfig() { + return tableConf; + } + + @Override + @Deprecated(since = "2.1.0") + public ServiceEnvironment getServiceEnv() { + return serviceEnvironment.get(); + } + + @Override + public PluginEnvironment getPluginEnv() { + return serviceEnvironment.get(); + } + + private ServiceEnvironment createServiceEnv() { + return new ServiceEnvironment() { + @Override + public T instantiate(TableId tableId, String className, Class base) + throws ReflectiveOperationException { + return instantiate(className, base); + } + + @Override + public T instantiate(String className, Class base) + throws ReflectiveOperationException { + return this.getClass().getClassLoader().loadClass(className).asSubclass(base) + .getDeclaredConstructor().newInstance(); + } + + @Override + public String getTableName(TableId tableId) { + throw new UnsupportedOperationException(errorMsg); + } + + @Override + public Configuration getConfiguration(TableId tableId) { + Preconditions.checkArgument(tableId == getTableId(), + "Expected tableId obtained from IteratorEnvironment.getTableId() but got " + tableId + + " when requesting the table config"); + return new ConfigurationImpl(tableConf); + } + + @Override + public Configuration getConfiguration() { + throw new UnsupportedOperationException(errorMsg); + } + }; + } } @Override diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java index a03cc811ab8..ce1bb2d26e7 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java @@ -31,6 +31,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Map.Entry; +import java.util.function.Supplier; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -73,6 +74,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.Text; +import com.google.common.base.Suppliers; + class OfflineIterator implements Iterator> { static class OfflineIteratorEnvironment implements IteratorEnvironment { @@ -83,6 +86,7 @@ static class OfflineIteratorEnvironment implements IteratorEnvironment { private final SamplerConfiguration sampleConf; private final ClientContext context; private final TableId tableId; + private final Supplier serviceEnvironment; public OfflineIteratorEnvironment(ClientContext context, TableId tableId, Authorizations auths, AccumuloConfiguration acuTableConf, boolean useSample, SamplerConfiguration samplerConf) { @@ -92,6 +96,7 @@ public OfflineIteratorEnvironment(ClientContext context, TableId tableId, Author this.conf = acuTableConf; this.useSample = useSample; this.sampleConf = samplerConf; + this.serviceEnvironment = Suppliers.memoize(() -> new ClientServiceEnvironmentImpl(context)); } @Deprecated(since = "2.0.0") @@ -107,12 +112,14 @@ public IteratorScope getIteratorScope() { @Override public boolean isFullMajorCompaction() { - return false; + throw new IllegalStateException( + "Asked about major compaction type when scope is " + getIteratorScope()); } @Override public boolean isUserCompaction() { - return false; + throw new IllegalStateException( + "Asked about user initiated compaction type when scope is " + getIteratorScope()); } private final ArrayList> topLevelIterators = @@ -160,12 +167,12 @@ public IteratorEnvironment cloneWithSamplingEnabled() { @Deprecated(since = "2.1.0") @Override public ServiceEnvironment getServiceEnv() { - return new ClientServiceEnvironmentImpl(context); + return serviceEnvironment.get(); } @Override public PluginEnvironment getPluginEnv() { - return new ClientServiceEnvironmentImpl(context); + return serviceEnvironment.get(); } @Override diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java b/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java index 372a0e49a30..114e731173b 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java @@ -43,7 +43,7 @@ default SortedKeyValueIterator reserveMapFileReader(String mapFileNam } /** - * @deprecated since 2.0.0. This method was using an unstable non public type. Use + * @deprecated since 2.0.0. This method was using an unstable, non-public type. Use * {@link #getPluginEnv()} */ @Deprecated(since = "2.0.0") @@ -52,16 +52,16 @@ default AccumuloConfiguration getConfig() { } /** - * Return the executed scope of the Iterator. Value will be one of the following: - * {@link IteratorScope#scan}, {@link IteratorScope#minc}, {@link IteratorScope#majc} + * @return the executed scope of the Iterator. Value will be one of the following: + * {@link IteratorScope#scan}, {@link IteratorScope#minc}, {@link IteratorScope#majc} */ default IteratorScope getIteratorScope() { throw new UnsupportedOperationException(); } /** - * Return true if the compaction is a full major compaction. Will throw IllegalStateException if - * {@link #getIteratorScope()} != {@link IteratorScope#majc}. + * @return true if the compaction is a full major compaction; false otherwise + * @throws IllegalStateException if {@link #getIteratorScope()} != {@link IteratorScope#majc}. */ default boolean isFullMajorCompaction() { throw new UnsupportedOperationException(); @@ -76,8 +76,9 @@ default void registerSideChannel(SortedKeyValueIterator iter) { } /** - * Return the Scan Authorizations used in this Iterator. Will throw UnsupportedOperationException - * if {@link #getIteratorScope()} != {@link IteratorScope#scan}. + * @return the Scan Authorizations used in this Iterator. + * @throws UnsupportedOperationException if {@link #getIteratorScope()} != + * {@link IteratorScope#scan}. */ default Authorizations getAuthorizations() { throw new UnsupportedOperationException(); @@ -123,7 +124,7 @@ default IteratorEnvironment cloneWithSamplingEnabled() { * is for a deep copy created with an environment created by calling * {@link #cloneWithSamplingEnabled()} * - * @return true if sampling is enabled for this environment. + * @return true if sampling is enabled for this environment; false otherwise * @since 1.8.0 */ default boolean isSamplingEnabled() { @@ -132,7 +133,7 @@ default boolean isSamplingEnabled() { /** * - * @return sampling configuration is sampling is enabled for environment, otherwise returns null. + * @return sampling configuration if sampling is enabled for environment, otherwise null. * @since 1.8.0 */ default SamplerConfiguration getSamplerConfiguration() { @@ -140,7 +141,8 @@ default SamplerConfiguration getSamplerConfiguration() { } /** - * True if compaction was user initiated. + * @return true if compaction was user initiated; false otherwise + * @throws IllegalStateException if {@link #getIteratorScope()} != {@link IteratorScope#majc}. * * @since 2.0.0 */ @@ -157,7 +159,7 @@ default boolean isUserCompaction() { * * * @since 2.0.0 - * @deprecated since 2.1.0. This method was using a non public API type. Use + * @deprecated since 2.1.0. This method was using a non-public API type. Use * {@link #getPluginEnv()} instead because it has better stability guarantees. */ @Deprecated(since = "2.1.0") @@ -180,7 +182,7 @@ default PluginEnvironment getPluginEnv() { } /** - * Return the table Id associated with this iterator. + * @return the table Id associated with this iterator. * * @since 2.0.0 */ diff --git a/server/base/src/main/java/org/apache/accumulo/server/iterators/TabletIteratorEnvironment.java b/server/base/src/main/java/org/apache/accumulo/server/iterators/TabletIteratorEnvironment.java index 63970b2943a..d6522315d2f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/iterators/TabletIteratorEnvironment.java +++ b/server/base/src/main/java/org/apache/accumulo/server/iterators/TabletIteratorEnvironment.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.Map; +import org.apache.accumulo.core.client.PluginEnvironment; import org.apache.accumulo.core.client.SampleNotPresentException; import org.apache.accumulo.core.client.sample.SamplerConfiguration; import org.apache.accumulo.core.conf.AccumuloConfiguration; @@ -232,6 +233,11 @@ public ServiceEnvironment getServiceEnv() { return serviceEnvironment; } + @Override + public PluginEnvironment getPluginEnv() { + return serviceEnvironment; + } + @Override public TableId getTableId() { return tableId; diff --git a/test/src/main/java/org/apache/accumulo/test/IteratorEnvIT.java b/test/src/main/java/org/apache/accumulo/test/IteratorEnvIT.java index 05cc7a3a601..00e0ce25c38 100644 --- a/test/src/main/java/org/apache/accumulo/test/IteratorEnvIT.java +++ b/test/src/main/java/org/apache/accumulo/test/IteratorEnvIT.java @@ -19,34 +19,45 @@ package org.apache.accumulo.test; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import java.io.File; import java.io.IOException; import java.time.Duration; import java.util.Collections; import java.util.EnumSet; -import java.util.Iterator; import java.util.Map; +import java.util.TreeMap; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.ClientSideIteratorScanner; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.PluginEnvironment; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.rfile.RFile; +import org.apache.accumulo.core.client.rfile.RFileWriter; +import org.apache.accumulo.core.clientImpl.OfflineScanner; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.Filter; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; -import org.apache.accumulo.core.iterators.WrappingIterator; +import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.spi.common.ServiceEnvironment; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -71,13 +82,15 @@ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoo /** * Basic scan iterator to test IteratorEnvironment returns what is expected. */ - public static class ScanIter extends WrappingIterator { + public static class ScanIter extends Filter { IteratorScope scope = IteratorScope.scan; + String badColFam; @Override public void init(SortedKeyValueIterator source, Map options, IteratorEnvironment env) throws IOException { super.init(source, options, env); + this.badColFam = options.get("bad.col.fam"); testEnv(scope, options, env); // Checking for compaction on a scan should throw an error. @@ -92,18 +105,27 @@ public void init(SortedKeyValueIterator source, Map op "Test failed - Expected to throw IllegalStateException when checking compaction on a scan."); } catch (IllegalStateException e) {} } + + @Override + public boolean accept(Key k, Value v) { + // The only reason for filtering out some data is as a way to verify init() and testEnv() + // have been called + return !k.getColumnFamily().toString().equals(badColFam); + } } /** * Basic compaction iterator to test IteratorEnvironment returns what is expected. */ - public static class MajcIter extends WrappingIterator { + public static class MajcIter extends Filter { IteratorScope scope = IteratorScope.majc; + String badColFam; @Override public void init(SortedKeyValueIterator source, Map options, IteratorEnvironment env) throws IOException { super.init(source, options, env); + this.badColFam = options.get("bad.col.fam"); testEnv(scope, options, env); try { env.isUserCompaction(); @@ -116,18 +138,27 @@ public void init(SortedKeyValueIterator source, Map op throw new RuntimeException("Test failed"); } } + + @Override + public boolean accept(Key k, Value v) { + // The only reason for filtering out some data is as a way to verify init() and testEnv() + // have been called + return !k.getColumnFamily().toString().equals(badColFam); + } } /** * */ - public static class MincIter extends WrappingIterator { + public static class MincIter extends Filter { IteratorScope scope = IteratorScope.minc; + String badColFam; @Override public void init(SortedKeyValueIterator source, Map options, IteratorEnvironment env) throws IOException { super.init(source, options, env); + this.badColFam = options.get("bad.col.fam"); testEnv(scope, options, env); try { env.isUserCompaction(); @@ -140,6 +171,13 @@ public void init(SortedKeyValueIterator source, Map op "Test failed - Expected to throw IllegalStateException when checking compaction on a scan."); } catch (IllegalStateException e) {} } + + @Override + public boolean accept(Key k, Value v) { + // The only reason for filtering out some data is as a way to verify init() and testEnv() + // have been called + return !k.getColumnFamily().toString().equals(badColFam); + } } /** @@ -147,7 +185,8 @@ public void init(SortedKeyValueIterator source, Map op */ private static void testEnv(IteratorScope scope, Map opts, IteratorEnvironment env) { - TableId expectedTableId = TableId.of(opts.get("expected.table.id")); + String expTableIdStr = opts.get("expected.table.id"); + TableId expTableId = expTableIdStr == null ? null : TableId.of(expTableIdStr); // verify getServiceEnv() and getPluginEnv() are the same objects, // so further checks only need to use getPluginEnv() @@ -172,9 +211,14 @@ private static void testEnv(IteratorScope scope, Map opts, if (!"value1".equals(tableConf.getTableCustom("iterator.env.test"))) { throw new RuntimeException("Test failed - Expected table property not found in table conf."); } - var systemConf = pluginEnv.getConfiguration(); - if (systemConf.get("table.custom.iterator.env.test") != null) { - throw new RuntimeException("Test failed - Unexpected table property found in system conf."); + if (!env.getClass().getName().contains("RFileScanner$IterEnv")) { + var systemConf = pluginEnv.getConfiguration(); + if (systemConf.get("table.custom.iterator.env.test") != null) { + throw new RuntimeException("Test failed - Unexpected table property found in system conf."); + } + } else { + // We expect RFileScanner's IterEnv to throw an UOE + assertThrows(UnsupportedOperationException.class, pluginEnv::getConfiguration); } // check other environment settings @@ -184,7 +228,7 @@ private static void testEnv(IteratorScope scope, Map opts, if (env.isSamplingEnabled()) { throw new RuntimeException("Test failed - isSamplingEnabled returned true, expected false"); } - if (!expectedTableId.equals(env.getTableId())) { + if (expTableId != null && !expTableId.equals(env.getTableId())) { throw new RuntimeException("Test failed - Error getting Table ID"); } } @@ -203,10 +247,14 @@ public void finish() { @Test public void test() throws Exception { - String[] tables = getUniqueNames(3); + String[] tables = getUniqueNames(5); testScan(tables[0], ScanIter.class); - testCompact(tables[1], MajcIter.class); - testMinCompact(tables[2], MincIter.class); + // No table id when scanning at file level + testRFileScan(ScanIter.class); + testOfflineScan(tables[1], ScanIter.class); + testClientSideScan(tables[2], ScanIter.class); + testCompact(tables[3], MajcIter.class); + testMinCompact(tables[4], MincIter.class); } private void testScan(String tableName, @@ -215,10 +263,62 @@ private void testScan(String tableName, IteratorSetting cfg = new IteratorSetting(1, iteratorClass); cfg.addOption("expected.table.id", client.tableOperations().tableIdMap().get(tableName)); + cfg.addOption("bad.col.fam", "badcf"); + try (Scanner scan = client.createScanner(tableName)) { scan.addScanIterator(cfg); - Iterator> iter = scan.iterator(); - iter.forEachRemaining(e -> assertEquals("cf1", e.getKey().getColumnFamily().toString())); + validateScanner(scan); + } + } + + private void testRFileScan(Class> iteratorClass) + throws Exception { + TreeMap data = createTestData(); + LocalFileSystem fs = FileSystem.getLocal(new Configuration()); + String rFilePath = createRFile(fs, data); + + IteratorSetting cfg = new IteratorSetting(1, iteratorClass); + cfg.addOption("bad.col.fam", "badcf"); + + try (Scanner scan = RFile.newScanner().from(rFilePath).withFileSystem(fs) + .withTableProperties(getTableConfig().getProperties()).build()) { + scan.addScanIterator(cfg); + validateScanner(scan); + } + } + + public void testOfflineScan(String tableName, + Class> iteratorClass) throws Exception { + writeData(tableName); + + TableId tableId = getServerContext().getTableId(tableName); + getServerContext().tableOperations().offline(tableName, true); + + IteratorSetting cfg = new IteratorSetting(1, iteratorClass); + cfg.addOption("expected.table.id", + getServerContext().tableOperations().tableIdMap().get(tableName)); + cfg.addOption("bad.col.fam", "badcf"); + + try (OfflineScanner scan = + new OfflineScanner(getServerContext(), tableId, new Authorizations())) { + scan.addScanIterator(cfg); + validateScanner(scan); + } + } + + public void testClientSideScan(String tableName, + Class> iteratorClass) throws Exception { + writeData(tableName); + + IteratorSetting cfg = new IteratorSetting(1, iteratorClass); + cfg.addOption("expected.table.id", + getServerContext().tableOperations().tableIdMap().get(tableName)); + cfg.addOption("bad.col.fam", "badcf"); + + try (Scanner scan = client.createScanner(tableName); + var clientIterScan = new ClientSideIteratorScanner(scan)) { + clientIterScan.addScanIterator(cfg); + validateScanner(clientIterScan); } } @@ -228,9 +328,14 @@ public void testCompact(String tableName, IteratorSetting cfg = new IteratorSetting(1, iteratorClass); cfg.addOption("expected.table.id", client.tableOperations().tableIdMap().get(tableName)); + cfg.addOption("bad.col.fam", "badcf"); CompactionConfig config = new CompactionConfig(); config.setIterators(Collections.singletonList(cfg)); client.tableOperations().compact(tableName, config); + + try (Scanner scan = client.createScanner(tableName)) { + validateScanner(scan); + } } public void testMinCompact(String tableName, @@ -239,10 +344,15 @@ public void testMinCompact(String tableName, IteratorSetting cfg = new IteratorSetting(1, iteratorClass); cfg.addOption("expected.table.id", client.tableOperations().tableIdMap().get(tableName)); + cfg.addOption("bad.col.fam", "badcf"); client.tableOperations().attachIterator(tableName, cfg, EnumSet.of(IteratorScope.minc)); - client.tableOperations().flush(tableName); + client.tableOperations().flush(tableName, null, null, true); + + try (Scanner scan = client.createScanner(tableName)) { + validateScanner(scan); + } } private NewTableConfiguration getTableConfig() { @@ -255,15 +365,51 @@ private void writeData(String tableName) throws Exception { client.tableOperations().create(tableName, getTableConfig()); try (BatchWriter bw = client.createBatchWriter(tableName)) { - Mutation m = new Mutation("row1"); - m.at().family("cf1").qualifier("cq1").put("val1"); - bw.addMutation(m); - m = new Mutation("row2"); - m.at().family("cf1").qualifier("cq1").put("val2"); - bw.addMutation(m); - m = new Mutation("row3"); - m.at().family("cf1").qualifier("cq1").put("val3"); - bw.addMutation(m); + for (Map.Entry data : createTestData().entrySet()) { + Mutation m = new Mutation(data.getKey().getRow()); + m.at().family(data.getKey().getColumnFamily()).qualifier(data.getKey().getColumnQualifier()) + .put(data.getValue()); + bw.addMutation(m); + } + } + } + + private TreeMap createTestData() { + TreeMap testData = new TreeMap<>(); + + // Write data that we do not expect to be filtered out + testData.put(new Key("row1", "cf1", "cq1"), new Value("val1")); + testData.put(new Key("row2", "cf1", "cq1"), new Value("val2")); + testData.put(new Key("row3", "cf1", "cq1"), new Value("val3")); + // Write data that we expect to be filtered out + testData.put(new Key("row4", "badcf", "badcq"), new Value("val1")); + testData.put(new Key("row5", "badcf", "badcq"), new Value("val2")); + testData.put(new Key("row6", "badcf", "badcq"), new Value("val3")); + + return testData; + } + + private String createRFile(FileSystem fs, TreeMap data) throws Exception { + File dir = new File(System.getProperty("user.dir") + "/target/rfilescan-iterenv-test"); + assertTrue(dir.mkdirs()); + String filePath = dir.getAbsolutePath() + "/test.rf"; + + try (RFileWriter writer = RFile.newWriter().to(filePath).withFileSystem(fs).build()) { + writer.append(data.entrySet()); + } + + fs.deleteOnExit(new Path(dir.getAbsolutePath())); + + return filePath; + } + + private void validateScanner(Scanner scan) { + // Ensure the badcf was filtered out to ensure init() and testEnv() were called + int numElts = 0; + for (var e : scan) { + numElts++; + assertEquals("cf1", e.getKey().getColumnFamily().toString()); } + assertEquals(3, numElts); } }