diff --git a/titan-hadoop-parent/pom.xml b/titan-hadoop-parent/pom.xml
index b97a17b544..4c8b9bc2a1 100644
--- a/titan-hadoop-parent/pom.xml
+++ b/titan-hadoop-parent/pom.xml
@@ -14,7 +14,7 @@
${project.parent.basedir}
${skipTests}
- true
+ ${skipTests}
${skipTests}
diff --git a/titan-hadoop-parent/titan-hadoop-2/pom.xml b/titan-hadoop-parent/titan-hadoop-2/pom.xml
index f2f65bc519..c301ea8013 100644
--- a/titan-hadoop-parent/titan-hadoop-2/pom.xml
+++ b/titan-hadoop-parent/titan-hadoop-2/pom.xml
@@ -40,6 +40,14 @@
test
true
+
+
+ ${project.groupId}
+ titan-hbase-core
+ ${project.version}
+ test
+
org.apache.mrunit
mrunit
@@ -50,7 +58,7 @@
org.apache.hbase
hbase-server
- ${hbase098.version}
+ ${hbase100.version}
true
test
@@ -58,14 +66,24 @@
org.mortbay.jetty
servlet-api-2.5
+
+ com.lmax
+ disruptor
+
org.apache.hbase
hbase-client
- ${hbase098.version}
+ ${hbase100.version}
true
test
+
+
+ com.lmax
+ disruptor
+
+
${project.groupId}
diff --git a/titan-hadoop-parent/titan-hadoop-core/src/main/java/com/thinkaurelius/titan/hadoop/formats/hbase/HBaseBinaryInputFormat.java b/titan-hadoop-parent/titan-hadoop-core/src/main/java/com/thinkaurelius/titan/hadoop/formats/hbase/HBaseBinaryInputFormat.java
index ffcb5a0d58..71345fc944 100644
--- a/titan-hadoop-parent/titan-hadoop-core/src/main/java/com/thinkaurelius/titan/hadoop/formats/hbase/HBaseBinaryInputFormat.java
+++ b/titan-hadoop-parent/titan-hadoop-core/src/main/java/com/thinkaurelius/titan/hadoop/formats/hbase/HBaseBinaryInputFormat.java
@@ -10,8 +10,10 @@
import com.thinkaurelius.titan.hadoop.formats.util.AbstractBinaryInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableRecordReader;
@@ -32,7 +34,7 @@ public class HBaseBinaryInputFormat extends AbstractBinaryInputFormat {
private static final Logger log = LoggerFactory.getLogger(HBaseBinaryInputFormat.class);
private final TableInputFormat tableInputFormat = new TableInputFormat();
- private TableRecordReader tableReader;
+ private RecordReader tableReader;
private byte[] inputCFBytes;
private RecordReader> titanRecordReader;
@@ -43,8 +45,7 @@ public List getSplits(final JobContext jobContext) throws IOExceptio
@Override
public RecordReader> createRecordReader(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
- tableReader =
- (TableRecordReader) tableInputFormat.createRecordReader(inputSplit, taskAttemptContext);
+ tableReader = tableInputFormat.createRecordReader(inputSplit, taskAttemptContext);
titanRecordReader =
new HBaseBinaryRecordReader(tableReader, inputCFBytes);
return titanRecordReader;
@@ -90,7 +91,7 @@ public void setConf(final Configuration config) {
this.tableInputFormat.setConf(config);
}
- public TableRecordReader getTableReader() {
+ public RecordReader getTableReader() {
return tableReader;
}
diff --git a/titan-hadoop-parent/titan-hadoop-core/src/main/java/com/thinkaurelius/titan/hadoop/formats/hbase/HBaseBinaryRecordReader.java b/titan-hadoop-parent/titan-hadoop-core/src/main/java/com/thinkaurelius/titan/hadoop/formats/hbase/HBaseBinaryRecordReader.java
index 269eb4207a..ed751b7753 100644
--- a/titan-hadoop-parent/titan-hadoop-core/src/main/java/com/thinkaurelius/titan/hadoop/formats/hbase/HBaseBinaryRecordReader.java
+++ b/titan-hadoop-parent/titan-hadoop-core/src/main/java/com/thinkaurelius/titan/hadoop/formats/hbase/HBaseBinaryRecordReader.java
@@ -5,6 +5,8 @@
import com.thinkaurelius.titan.diskstorage.StaticBuffer;
import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer;
import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntry;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableRecordReader;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
@@ -17,11 +19,11 @@
public class HBaseBinaryRecordReader extends RecordReader> {
- private TableRecordReader reader;
+ private RecordReader reader;
private final byte[] edgestoreFamilyBytes;
- public HBaseBinaryRecordReader(final TableRecordReader reader, final byte[] edgestoreFamilyBytes) {
+ public HBaseBinaryRecordReader(final RecordReader reader, final byte[] edgestoreFamilyBytes) {
this.reader = reader;
this.edgestoreFamilyBytes = edgestoreFamilyBytes;
}
@@ -52,7 +54,7 @@ public void close() throws IOException {
}
@Override
- public float getProgress() {
+ public float getProgress() throws IOException, InterruptedException {
return this.reader.getProgress();
}
diff --git a/titan-hadoop-parent/titan-hadoop-core/src/test/java/com/thinkaurelius/titan/hadoop/AbstractInputFormatIT.java b/titan-hadoop-parent/titan-hadoop-core/src/test/java/com/thinkaurelius/titan/hadoop/AbstractInputFormatIT.java
new file mode 100644
index 0000000000..945d0f90ee
--- /dev/null
+++ b/titan-hadoop-parent/titan-hadoop-core/src/test/java/com/thinkaurelius/titan/hadoop/AbstractInputFormatIT.java
@@ -0,0 +1,102 @@
+package com.thinkaurelius.titan.hadoop;
+
+import com.thinkaurelius.titan.core.Cardinality;
+import com.thinkaurelius.titan.core.TitanVertex;
+import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration;
+import com.thinkaurelius.titan.example.GraphOfTheGodsFactory;
+import com.thinkaurelius.titan.graphdb.TitanGraphBaseTest;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public abstract class AbstractInputFormatIT extends TitanGraphBaseTest {
+
+
+ @Test
+ public void testReadGraphOfTheGods() throws Exception {
+ GraphOfTheGodsFactory.load(graph, null, true);
+ assertEquals(12L, (long) graph.traversal().V().count().next());
+ Graph g = getGraph();
+ GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class));
+ assertEquals(12L, (long) t.V().count().next());
+ }
+
+ @Test
+ public void testReadWideVertexWithManyProperties() throws Exception {
+ int numProps = 1 << 16;
+
+ long numV = 1;
+ mgmt.makePropertyKey("p").cardinality(Cardinality.LIST).dataType(Integer.class).make();
+ mgmt.commit();
+ finishSchema();
+
+ for (int j = 0; j < numV; j++) {
+ Vertex v = graph.addVertex();
+ for (int i = 0; i < numProps; i++) {
+ v.property("p", i);
+ }
+ }
+ graph.tx().commit();
+
+ assertEquals(numV, (long) graph.traversal().V().count().next());
+ Map propertiesOnVertex = graph.traversal().V().valueMap().next();
+ List> valuesOnP = (List)propertiesOnVertex.values().iterator().next();
+ assertEquals(numProps, valuesOnP.size());
+ Graph g = getGraph();
+ GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class));
+ assertEquals(numV, (long) t.V().count().next());
+ propertiesOnVertex = t.V().valueMap().next();
+ valuesOnP = (List)propertiesOnVertex.values().iterator().next();
+ assertEquals(numProps, valuesOnP.size());
+ }
+
+ @Test
+ public void testReadSelfEdge() throws Exception {
+ GraphOfTheGodsFactory.load(graph, null, true);
+ assertEquals(12L, (long) graph.traversal().V().count().next());
+
+ // Add a self-loop on sky with edge label "lives"; it's nonsense, but at least it needs no schema changes
+ TitanVertex sky = (TitanVertex)graph.query().has("name", "sky").vertices().iterator().next();
+ assertNotNull(sky);
+ assertEquals("sky", sky.value("name"));
+ assertEquals(1L, sky.query().direction(Direction.IN).edgeCount());
+ assertEquals(0L, sky.query().direction(Direction.OUT).edgeCount());
+ assertEquals(1L, sky.query().direction(Direction.BOTH).edgeCount());
+ sky.addEdge("lives", sky, "reason", "testReadSelfEdge");
+ assertEquals(2L, sky.query().direction(Direction.IN).edgeCount());
+ assertEquals(1L, sky.query().direction(Direction.OUT).edgeCount());
+ assertEquals(3L, sky.query().direction(Direction.BOTH).edgeCount());
+ graph.tx().commit();
+
+ // Read the new edge using the inputformat
+ Graph g = getGraph();
+ GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class));
+ Iterator