Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.accumulo.core.clientImpl.ClientServiceEnvironmentImpl;
import org.apache.accumulo.core.clientImpl.ScannerImpl;
import org.apache.accumulo.core.clientImpl.ScannerOptions;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Column;
Expand All @@ -55,6 +57,8 @@
import org.apache.accumulo.core.spi.common.ServiceEnvironment;
import org.apache.hadoop.io.Text;

import com.google.common.base.Suppliers;

/**
* A scanner that instantiates iterators on the client side instead of on the tablet server. This
* can be useful for testing iterators or in cases where you don't want iterators affecting the
Expand Down Expand Up @@ -91,10 +95,13 @@ private class ClientSideIteratorEnvironment implements IteratorEnvironment {

private final SamplerConfiguration samplerConfig;
private final boolean sampleEnabled;
private final Supplier<ServiceEnvironment> serviceEnvironment;

ClientSideIteratorEnvironment(boolean sampleEnabled, SamplerConfiguration samplerConfig) {
this.sampleEnabled = sampleEnabled;
this.samplerConfig = samplerConfig;
this.serviceEnvironment =
Suppliers.memoize(() -> new ClientServiceEnvironmentImpl(context.get()));
}

@Override
Expand All @@ -111,7 +118,8 @@ public boolean isFullMajorCompaction() {

@Override
public boolean isUserCompaction() {
return false;
throw new IllegalStateException(
"Asked about user initiated compaction type when scope is " + getIteratorScope());
}

@Override
Expand All @@ -134,15 +142,21 @@ public SamplerConfiguration getSamplerConfiguration() {
return samplerConfig;
}

@Override
@Deprecated(since = "2.0.0")
public AccumuloConfiguration getConfig() {
return new ConfigurationCopy(getPluginEnv().getConfiguration(getTableId()));
}

@Deprecated(since = "2.1.0")
@Override
public ServiceEnvironment getServiceEnv() {
return new ClientServiceEnvironmentImpl(context.get());
return serviceEnvironment.get();
}

@Override
public PluginEnvironment getPluginEnv() {
return new ClientServiceEnvironmentImpl(context.get());
return serviceEnvironment.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map.Entry;
import java.util.function.Predicate;

import org.apache.accumulo.core.client.PluginEnvironment;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
Expand All @@ -35,6 +36,8 @@
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;
Expand Down Expand Up @@ -172,17 +175,25 @@ public interface ScannerOptions {
* {@link Property#TABLE_PREFIX} may be accepted and used. For example, cache and crypto
* properties could be passed here.
*
* <p>
* Configured iterators will have access to these properties via the
* {@link PluginEnvironment#getConfiguration(TableId)} (obtained by
* {@link IteratorEnvironment#getPluginEnv()}). The tableId used to get the configuration should
* be the one returned programmatically from {@link IteratorEnvironment#getTableId()}.
*
* @param props iterable over Accumulo table key value properties.
* @return this
*/
ScannerOptions withTableProperties(Iterable<Entry<String,String>> props);

/**
* @see #withTableProperties(Iterable) Any property that impacts file behavior regardless of
* whether it has the {@link Property#TABLE_PREFIX} may be accepted and used. For example,
* cache and crypto properties could be passed here.
* Any property that impacts file behavior regardless of whether it has the
* {@link Property#TABLE_PREFIX} may be accepted and used. For example, cache and crypto
* properties could be passed here.
*
* @param props a map instead of an Iterable
* @return this
* @see #withTableProperties(Iterable)
*/
ScannerOptions withTableProperties(Map<String,String> props);

Expand Down Expand Up @@ -260,11 +271,13 @@ public interface SummaryOptions {
SummaryOptions withTableProperties(Iterable<Entry<String,String>> props);

/**
* @see #withTableProperties(Iterable) Any property that impacts file behavior regardless of
* whether it has the {@link Property#TABLE_PREFIX} may be accepted and used. For example,
* cache and crypto properties could be passed here.
* Any property that impacts file behavior regardless of whether it has the
* {@link Property#TABLE_PREFIX} may be accepted and used. For example, cache and crypto
* properties could be passed here.
*
* @param props a map instead of an Iterable
* @return this
* @see #withTableProperties(Iterable)
*/
SummaryOptions withTableProperties(Map<String,String> props);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.function.Supplier;

import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.PluginEnvironment;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.rfile.RFileScannerBuilder.InputArgs;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
Expand All @@ -44,6 +45,7 @@
import org.apache.accumulo.core.data.Column;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration;
import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheManagerFactory;
Expand All @@ -66,19 +68,24 @@
import org.apache.accumulo.core.spi.cache.BlockCacheManager;
import org.apache.accumulo.core.spi.cache.CacheEntry;
import org.apache.accumulo.core.spi.cache.CacheType;
import org.apache.accumulo.core.spi.common.ServiceEnvironment;
import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
import org.apache.accumulo.core.spi.crypto.CryptoService;
import org.apache.accumulo.core.util.ConfigurationImpl;
import org.apache.accumulo.core.util.LocalityGroupUtil;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.io.Text;

import com.google.common.base.Preconditions;
import com.google.common.base.Suppliers;

class RFileScanner extends ScannerOptions implements Scanner {

private static final byte[] EMPTY_BYTES = new byte[0];
private static final Range EMPTY_RANGE = new Range();

private static final String errorMsg =
"This scanner is unrelated to any table or accumulo instance;"
+ " it operates directly on files. Therefore, it can not support this operation.";
private Range range;
private BlockCacheManager blockCacheManager = null;
private BlockCache dataCache = null;
Expand Down Expand Up @@ -311,14 +318,27 @@ public void updateScanIteratorOption(String iteratorName, String key, String val
}

private class IterEnv implements IteratorEnvironment {
private final Supplier<ServiceEnvironment> serviceEnvironment;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What prevents us from just using the implementation from super.getPluginEnv() and super.getServiceEnv()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IteratorEnvironment doesn't have any implementation for getServiceEnv() or getPluginEnv(). Or are you suggesting changing IteratorEnvironment?


private IterEnv() {
this.serviceEnvironment = Suppliers.memoize(this::createServiceEnv);
}

@Override
public IteratorScope getIteratorScope() {
return IteratorScope.scan;
}

@Override
public boolean isFullMajorCompaction() {
return false;
throw new IllegalStateException(
"Asked about major compaction type when scope is " + getIteratorScope());
}

@Override
public boolean isUserCompaction() {
throw new IllegalStateException(
"Asked about user initiated compaction type when scope is " + getIteratorScope());
}

@Override
Expand All @@ -335,6 +355,70 @@ public boolean isSamplingEnabled() {
public SamplerConfiguration getSamplerConfiguration() {
return RFileScanner.this.getSamplerConfiguration();
}

/**
* This method only exists to be used as described in {@link IteratorEnvironment#getPluginEnv()}
* so the table config can be obtained. This simply returns null since a table id does not make
* sense in the context of scanning RFiles, but is needed to obtain the table configuration.
*
* @return null
*/
@Override
public TableId getTableId() {
return null;
}

@Override
@Deprecated(since = "2.0.0")
public AccumuloConfiguration getConfig() {
return tableConf;
}

@Override
@Deprecated(since = "2.1.0")
public ServiceEnvironment getServiceEnv() {
return serviceEnvironment.get();
}

@Override
public PluginEnvironment getPluginEnv() {
return serviceEnvironment.get();
}

private ServiceEnvironment createServiceEnv() {
return new ServiceEnvironment() {
@Override
public <T> T instantiate(TableId tableId, String className, Class<T> base)
throws ReflectiveOperationException {
return instantiate(className, base);
}

@Override
public <T> T instantiate(String className, Class<T> base)
throws ReflectiveOperationException {
return this.getClass().getClassLoader().loadClass(className).asSubclass(base)
.getDeclaredConstructor().newInstance();
}

@Override
public String getTableName(TableId tableId) {
throw new UnsupportedOperationException(errorMsg);
}

@Override
public Configuration getConfiguration(TableId tableId) {
Preconditions.checkArgument(tableId == getTableId(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getTableId above only returns null.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the intention was that the only valid use of this method is:
env.getPluginEnv().getConfiguration(env.getTableId())
(as described in javadocs for getPluginEnv())
env.getPluginEnv().getConfiguration(null)
would also work, and this still makes sense in the context of RFileScanner

Just wanted to avoid accepting something like
env.getPluginEnv().getConfiguration(someRandomTableId)

"Expected tableId obtained from IteratorEnvironment.getTableId() but got " + tableId
+ " when requesting the table config");
return new ConfigurationImpl(tableConf);
}

@Override
public Configuration getConfiguration() {
throw new UnsupportedOperationException(errorMsg);
}
};
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.function.Supplier;

import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
Expand Down Expand Up @@ -73,6 +74,8 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;

import com.google.common.base.Suppliers;

class OfflineIterator implements Iterator<Entry<Key,Value>> {

static class OfflineIteratorEnvironment implements IteratorEnvironment {
Expand All @@ -83,6 +86,7 @@ static class OfflineIteratorEnvironment implements IteratorEnvironment {
private final SamplerConfiguration sampleConf;
private final ClientContext context;
private final TableId tableId;
private final Supplier<ServiceEnvironment> serviceEnvironment;

public OfflineIteratorEnvironment(ClientContext context, TableId tableId, Authorizations auths,
AccumuloConfiguration acuTableConf, boolean useSample, SamplerConfiguration samplerConf) {
Expand All @@ -92,6 +96,7 @@ public OfflineIteratorEnvironment(ClientContext context, TableId tableId, Author
this.conf = acuTableConf;
this.useSample = useSample;
this.sampleConf = samplerConf;
this.serviceEnvironment = Suppliers.memoize(() -> new ClientServiceEnvironmentImpl(context));
}

@Deprecated(since = "2.0.0")
Expand All @@ -107,12 +112,14 @@ public IteratorScope getIteratorScope() {

@Override
public boolean isFullMajorCompaction() {
return false;
throw new IllegalStateException(
"Asked about major compaction type when scope is " + getIteratorScope());
}

@Override
public boolean isUserCompaction() {
return false;
throw new IllegalStateException(
"Asked about user initiated compaction type when scope is " + getIteratorScope());
}

private final ArrayList<SortedKeyValueIterator<Key,Value>> topLevelIterators =
Expand Down Expand Up @@ -160,12 +167,12 @@ public IteratorEnvironment cloneWithSamplingEnabled() {
@Deprecated(since = "2.1.0")
@Override
public ServiceEnvironment getServiceEnv() {
return new ClientServiceEnvironmentImpl(context);
return serviceEnvironment.get();
}

@Override
public PluginEnvironment getPluginEnv() {
return new ClientServiceEnvironmentImpl(context);
return serviceEnvironment.get();
}

@Override
Expand Down
Loading