diff --git a/contrib/pom.xml b/contrib/pom.xml
index 2f3ac9fa6e4..76d8369c71e 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -35,6 +35,7 @@
storage-hbase
storage-hive
storage-mongo
+ storage-phoenix
storage-jdbc
sqlline
data
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixRecordReader.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixRecordReader.java
index 020d35aafba..9dde6e49289 100755
--- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixRecordReader.java
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixRecordReader.java
@@ -17,19 +17,6 @@
*/
package org.apache.drill.exec.store.phoenix;
-import java.math.BigDecimal;
-import java.sql.Connection;
-import java.sql.Date;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Time;
-import java.sql.Timestamp;
-
-import javax.sql.DataSource;
-
-import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos.MajorType;
@@ -54,7 +41,17 @@
import org.apache.drill.exec.vector.NullableVarCharVector;
import org.apache.drill.exec.vector.ValueVector;
-import com.google.common.base.Charsets;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.schema.KeyValueSchema;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.ValueBitSet;
+import org.apache.phoenix.schema.ValueSchema;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.SQLCloseable;
+
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -64,23 +61,24 @@ class PhoenixRecordReader extends AbstractRecordReader {
.getLogger(PhoenixRecordReader.class);
private static final ImmutableMap JDBC_TYPE_MAPPINGS;
- private final DataSource source;
- private ResultSet resultSet;
+
+ private static final long MILLIS_IN_DAY = 1000L * 60 * 60 * 24;
+
private final String storagePluginName;
- private FragmentContext fragmentContext;
- private Connection connection;
- private Statement statement;
+ private final KeyValueSchema kvSchema;
+ private final ResultIterator result;
private final String name;
private ImmutableList vectors;
private ImmutableList> copiers;
- private OperatorContext operatorContext;
+ // workspace for each row
+ private final ImmutableBytesWritable ptr = new ImmutableBytesWritable();
- public PhoenixRecordReader(FragmentContext fragmentContext, DataSource source, String name, String storagePluginName) {
- this.fragmentContext = fragmentContext;
- this.source = source;
+ public PhoenixRecordReader(FragmentContext fragmentContext, ResultIterator result, KeyValueSchema kvSchema, String name, String storagePluginName) {
+ this.result = result;
this.name = name;
this.storagePluginName = storagePluginName;
+ this.kvSchema = kvSchema;
}
static {
@@ -116,28 +114,27 @@ public PhoenixRecordReader(FragmentContext fragmentContext, DataSource source, S
.build();
}
- private Copier> getCopier(int jdbcType, int offset, ResultSet result, ValueVector v) {
-
+ private Copier> getCopier(int offset, ValueVector v) {
if (v instanceof NullableBigIntVector) {
- return new BigIntCopier(offset, result, (NullableBigIntVector.Mutator) v.getMutator());
+ return new BigIntCopier(offset, (NullableBigIntVector.Mutator) v.getMutator());
} else if (v instanceof NullableFloat4Vector) {
- return new Float4Copier(offset, result, (NullableFloat4Vector.Mutator) v.getMutator());
+ return new Float4Copier(offset, (NullableFloat4Vector.Mutator) v.getMutator());
} else if (v instanceof NullableFloat8Vector) {
- return new Float8Copier(offset, result, (NullableFloat8Vector.Mutator) v.getMutator());
+ return new Float8Copier(offset, (NullableFloat8Vector.Mutator) v.getMutator());
} else if (v instanceof NullableIntVector) {
- return new IntCopier(offset, result, (NullableIntVector.Mutator) v.getMutator());
+ return new IntCopier(offset, (NullableIntVector.Mutator) v.getMutator());
} else if (v instanceof NullableVarCharVector) {
- return new VarCharCopier(offset, result, (NullableVarCharVector.Mutator) v.getMutator());
+ return new VarCharCopier(offset, (NullableVarCharVector.Mutator) v.getMutator());
} else if (v instanceof NullableVarBinaryVector) {
- return new VarBinaryCopier(offset, result, (NullableVarBinaryVector.Mutator) v.getMutator());
+ return new VarBinaryCopier(offset, (NullableVarBinaryVector.Mutator) v.getMutator());
} else if (v instanceof NullableDateVector) {
- return new DateCopier(offset, result, (NullableDateVector.Mutator) v.getMutator());
+ return new DateCopier(offset, (NullableDateVector.Mutator) v.getMutator());
} else if (v instanceof NullableTimeVector) {
- return new TimeCopier(offset, result, (NullableTimeVector.Mutator) v.getMutator());
+ return new TimeCopier(offset, (NullableTimeVector.Mutator) v.getMutator());
} else if (v instanceof NullableTimeStampVector) {
- return new TimeStampCopier(offset, result, (NullableTimeStampVector.Mutator) v.getMutator());
+ return new TimeStampCopier(offset, (NullableTimeStampVector.Mutator) v.getMutator());
} else if (v instanceof NullableBitVector) {
- return new BitCopier(offset, result, (NullableBitVector.Mutator) v.getMutator());
+ return new BitCopier(offset, (NullableBitVector.Mutator) v.getMutator());
}
throw new IllegalArgumentException("Unknown how to handle vector.");
@@ -146,29 +143,19 @@ private Copier> getCopier(int jdbcType, int offset, ResultSet result, ValueVec
@Override
public void setup(OperatorContext operatorContext, OutputMutator output) throws ExecutionSetupException {
try {
-
- this.operatorContext = operatorContext;
- connection = source.getConnection();
- statement = connection.createStatement();
- resultSet = statement.executeQuery("select * from " + name);
-
- final ResultSetMetaData meta = resultSet.getMetaData();
- final int columns = meta.getColumnCount();
ImmutableList.Builder vectorBuilder = ImmutableList.builder();
ImmutableList.Builder> copierBuilder = ImmutableList.builder();
- for (int i = 1; i <= columns; i++) {
- final String name = meta.getColumnLabel(i);
- final int jdbcType = meta.getColumnType(i);
- final int width = meta.getPrecision(i);
- final int scale = meta.getScale(i);
- MinorType minorType = JDBC_TYPE_MAPPINGS.get(jdbcType);
+ int i = 0;
+ for (ValueSchema.Field phoenixField : kvSchema.getFields()) {
+ final PDataType phoenixType = phoenixField.getDataType();
+ MinorType minorType = JDBC_TYPE_MAPPINGS.get(phoenixType.getSqlType());
if (minorType == null) {
throw UserException.dataReadError()
- .message("The JDBC storage plugin failed while trying to execute a query. "
- + "The JDBC data type %d is not currently supported.", jdbcType)
-
- .addContext("sql", name)
+ .message(
+ "The JDBC storage plugin failed while trying to execute a query. "
+ + "The JDBC data type %d is not currently supported.",
+ phoenixType)
.addContext("plugin", storagePluginName)
.build(logger);
}
@@ -179,17 +166,16 @@ public void setup(OperatorContext operatorContext, OutputMutator output) throws
minorType, type.getMode());
ValueVector vector = output.addField(field, clazz);
vectorBuilder.add(vector);
- copierBuilder.add(getCopier(jdbcType, i, resultSet, vector));
-
+ copierBuilder.add(getCopier(i++, vector));
}
vectors = vectorBuilder.build();
copiers = copierBuilder.build();
- } catch (SQLException | SchemaChangeException e) {
+ } catch (SchemaChangeException e) {
throw UserException.dataReadError(e)
.message("The JDBC storage plugin failed while trying setup the SQL query. ")
- .addContext("sql", name)
+ .addContext("name", name)
.addContext("plugin", storagePluginName)
.build(logger);
}
@@ -199,20 +185,40 @@ public void setup(OperatorContext operatorContext, OutputMutator output) throws
@Override
public int next() {
int counter = 0;
- Boolean b = true;
try {
- while (counter < 4095 && b == true) { // loop at 4095 since nullables use one more than record count and we
- // allocate on powers of two.
- b = resultSet.next();
- if(b == false) {
+ final ValueBitSet valueSet = ValueBitSet.newInstance(kvSchema);
+ for (;;) {
+ Tuple tuple = result.next();
+ if(tuple == null) {
break;
}
- for (Copier> c : copiers) {
- c.copy(counter);
+
+ final Cell value = tuple.getValue(0);
+ ptr.set(value.getValueArray(),
+ value.getValueOffset(),
+ value.getValueLength());
+ valueSet.clear();
+ valueSet.or(ptr);
+
+ final int maxOffset = ptr.getOffset() + ptr.getLength();
+ kvSchema.iterator(ptr);
+ for (int i = 0;; i++) {
+ final Boolean hasValue = kvSchema.next(ptr, i, maxOffset, valueSet);
+ if (hasValue == null) {
+ break;
+ }
+ final Copier> copier = copiers.get(i);
+ if (hasValue) {
+ copier.copy(counter);
+ }
+ }
+ if (++counter == 4095) {
+ // loop at 4095 since nullables use one more than record count and we
+ // allocate on powers of two.
+ break;
}
- counter++;
}
- } catch (SQLException e) {
+ } catch (Exception e) {
throw UserException
.dataReadError(e)
.message("Failure while attempting to read from database.")
@@ -225,72 +231,77 @@ public int next() {
vv.getMutator().setValueCount(counter > 0 ? counter : 0);
}
- return counter>0 ? counter : 0;
+ return counter > 0 ? counter : 0;
}
@Override
public void cleanup() {
- AutoCloseables.close(resultSet, logger);
- AutoCloseables.close(statement, logger);
- AutoCloseables.close(connection, logger);
+ close(result, logger);
+ }
+
+ /** Similar to {@link org.apache.drill.common.AutoCloseables#close}. */
+ public static void close(final SQLCloseable ac, final org.slf4j.Logger logger) {
+ if (ac == null) {
+ return;
+ }
+
+ try {
+ ac.close();
+ } catch(Exception e) {
+ logger.warn("Failure on close(): {}", e);
+ }
}
private abstract class Copier {
protected final int columnIndex;
- protected final ResultSet result;
+ protected final PDataType.PDataCodec codec;
+ protected final SortOrder sortOrder;
protected final T mutator;
- public Copier(int columnIndex, ResultSet result, T mutator) {
+ public Copier(int columnIndex, T mutator) {
super();
this.columnIndex = columnIndex;
- this.result = result;
+ final ValueSchema.Field field = kvSchema.getField(columnIndex);
+ this.codec = field.getDataType().getCodec();
this.mutator = mutator;
+ this.sortOrder = field.getSortOrder();
}
- abstract void copy(int index) throws SQLException;
+ abstract void copy(int index);
}
private class IntCopier extends Copier {
- public IntCopier(int offset, ResultSet set, NullableIntVector.Mutator mutator) {
- super(offset, set, mutator);
+ public IntCopier(int offset, NullableIntVector.Mutator mutator) {
+ super(offset, mutator);
}
@Override
- void copy(int index) throws SQLException {
- mutator.setSafe(index, result.getInt(columnIndex));
- if (result.wasNull()) {
- mutator.setNull(index);
- }
+ void copy(int index) {
+ mutator.setSafe(index, codec.decodeInt(ptr, sortOrder));
}
}
private class BigIntCopier extends Copier {
- public BigIntCopier(int offset, ResultSet set, NullableBigIntVector.Mutator mutator) {
- super(offset, set, mutator);
+ public BigIntCopier(int offset, NullableBigIntVector.Mutator mutator) {
+ super(offset, mutator);
}
@Override
- void copy(int index) throws SQLException {
- mutator.setSafe(index, result.getLong(columnIndex));
- if (result.wasNull()) {
- mutator.setNull(index);
- }
+ void copy(int index) {
+ mutator.setSafe(index, codec.decodeLong(ptr, sortOrder));
}
}
private class Float4Copier extends Copier {
- public Float4Copier(int columnIndex, ResultSet result, NullableFloat4Vector.Mutator mutator) {
- super(columnIndex, result, mutator);
+ public Float4Copier(int columnIndex, NullableFloat4Vector.Mutator mutator) {
+ super(columnIndex, mutator);
}
@Override
- void copy(int index) throws SQLException {
- mutator.setSafe(index, result.getFloat(columnIndex));
- if (result.wasNull()) {
- mutator.setNull(index);
- }
+ void copy(int index) {
+ mutator.setSafe(index, codec.decodeFloat(ptr, sortOrder));
}
}
@@ -298,132 +309,90 @@ void copy(int index) throws SQLException {
private class Float8Copier extends Copier {
- public Float8Copier(int columnIndex, ResultSet result, NullableFloat8Vector.Mutator mutator) {
- super(columnIndex, result, mutator);
+ public Float8Copier(int columnIndex, NullableFloat8Vector.Mutator mutator) {
+ super(columnIndex, mutator);
}
@Override
- void copy(int index) throws SQLException {
- mutator.setSafe(index, result.getDouble(columnIndex));
- if (result.wasNull()) {
- mutator.setNull(index);
- }
-
- }
-
- }
-
- private class DecimalCopier extends Copier {
-
- public DecimalCopier(int columnIndex, ResultSet result, NullableFloat8Vector.Mutator mutator) {
- super(columnIndex, result, mutator);
- }
-
- @Override
- void copy(int index) throws SQLException {
- BigDecimal decimal = result.getBigDecimal(columnIndex);
- if (decimal != null) {
- mutator.setSafe(index, decimal.doubleValue());
- }
+ void copy(int index) {
+ mutator.setSafe(index, codec.decodeDouble(ptr, sortOrder));
}
}
-
+
private class VarCharCopier extends Copier {
- public VarCharCopier(int columnIndex, ResultSet result, NullableVarCharVector.Mutator mutator) {
- super(columnIndex, result, mutator);
+ public VarCharCopier(int columnIndex, NullableVarCharVector.Mutator mutator) {
+ super(columnIndex, mutator);
}
@Override
- void copy(int index) throws SQLException {
- String val = resultSet.getString(columnIndex);
- if (val != null) {
- byte[] record = val.getBytes(Charsets.UTF_8);
- mutator.setSafe(index, record, 0, record.length);
- }
+ void copy(int index) {
+ mutator.setSafe(index, ptr.get(), ptr.getOffset(), ptr.getLength());
}
}
private class VarBinaryCopier extends Copier {
- public VarBinaryCopier(int columnIndex, ResultSet result, NullableVarBinaryVector.Mutator mutator) {
- super(columnIndex, result, mutator);
+ public VarBinaryCopier(int columnIndex, NullableVarBinaryVector.Mutator mutator) {
+ super(columnIndex, mutator);
}
@Override
- void copy(int index) throws SQLException {
- byte[] record = result.getBytes(columnIndex);
- if (record != null) {
- mutator.setSafe(index, record, 0, record.length);
- }
+ void copy(int index) {
+ mutator.setSafe(index, ptr.get(), ptr.getOffset(), ptr.getLength());
}
}
private class DateCopier extends Copier {
- public DateCopier(int columnIndex, ResultSet result, NullableDateVector.Mutator mutator) {
- super(columnIndex, result, mutator);
+ public DateCopier(int columnIndex, NullableDateVector.Mutator mutator) {
+ super(columnIndex, mutator);
}
@Override
- void copy(int index) throws SQLException {
- Date date = result.getDate(columnIndex);
- if (date != null) {
- mutator.setSafe(index, date.getTime());
- }
+ void copy(int index) {
+ mutator.setSafe(index, codec.decodeLong(ptr, sortOrder));
}
}
private class TimeCopier extends Copier {
-
- public TimeCopier(int columnIndex, ResultSet result, NullableTimeVector.Mutator mutator) {
- super(columnIndex, result, mutator);
+ public TimeCopier(int columnIndex, NullableTimeVector.Mutator mutator) {
+ super(columnIndex, mutator);
}
@Override
- void copy(int index) throws SQLException {
- Time time = result.getTime(columnIndex);
- if (time != null) {
- mutator.setSafe(index, (int) time.getTime());
- }
-
+ void copy(int index) {
+ mutator.setSafe(index, (int) (codec.decodeLong(ptr, sortOrder) % MILLIS_IN_DAY));
}
}
private class TimeStampCopier extends Copier {
- public TimeStampCopier(int columnIndex, ResultSet result, NullableTimeStampVector.Mutator mutator) {
- super(columnIndex, result, mutator);
+ public TimeStampCopier(int columnIndex, NullableTimeStampVector.Mutator mutator) {
+ super(columnIndex, mutator);
}
@Override
- void copy(int index) throws SQLException {
- Timestamp stamp = result.getTimestamp(columnIndex);
- if (stamp != null) {
- mutator.setSafe(index, stamp.getTime());
- }
-
+ void copy(int index) {
+ mutator.setSafe(index, codec.decodeLong(ptr, sortOrder));
}
}
private class BitCopier extends Copier {
- public BitCopier(int columnIndex, ResultSet result, NullableBitVector.Mutator mutator) {
- super(columnIndex, result, mutator);
+ public BitCopier(int columnIndex, NullableBitVector.Mutator mutator) {
+ super(columnIndex, mutator);
}
@Override
- void copy(int index) throws SQLException {
- mutator.setSafe(index, result.getBoolean(columnIndex) ? 1 : 0);
- if (result.wasNull()) {
- mutator.setNull(index);
- }
+ void copy(int index) {
+ mutator.setSafe(index, codec.decodeByte(ptr, sortOrder));
}
}