From 4b163aa4c7a253052f6090d94e0be6caf4e316fa Mon Sep 17 00:00:00 2001 From: sjudeng Date: Fri, 19 Feb 2016 08:16:49 -0600 Subject: [PATCH 1/3] Update titan-hadoop-2 test dependency to hbase-1.0 and add HBaseInputFormat tests to reproduce issue #1268. --- titan-hadoop-parent/pom.xml | 2 +- titan-hadoop-parent/titan-hadoop-2/pom.xml | 22 +++- .../titan/hadoop/AbstractInputFormatIT.java | 102 ++++++++++++++++++ .../titan/hadoop/CassandraInputFormatIT.java | 92 +--------------- .../titan/hadoop/HBaseInputFormatIT.java | 46 ++++++++ .../src/test/resources/hbase-read.properties | 29 +++++ 6 files changed, 201 insertions(+), 92 deletions(-) create mode 100644 titan-hadoop-parent/titan-hadoop-core/src/test/java/com/thinkaurelius/titan/hadoop/AbstractInputFormatIT.java create mode 100644 titan-hadoop-parent/titan-hadoop-core/src/test/java/com/thinkaurelius/titan/hadoop/HBaseInputFormatIT.java create mode 100644 titan-hadoop-parent/titan-hadoop-core/src/test/resources/hbase-read.properties diff --git a/titan-hadoop-parent/pom.xml b/titan-hadoop-parent/pom.xml index b97a17b544..4c8b9bc2a1 100644 --- a/titan-hadoop-parent/pom.xml +++ b/titan-hadoop-parent/pom.xml @@ -14,7 +14,7 @@ ${project.parent.basedir} ${skipTests} - true + ${skipTests} ${skipTests} diff --git a/titan-hadoop-parent/titan-hadoop-2/pom.xml b/titan-hadoop-parent/titan-hadoop-2/pom.xml index f2f65bc519..c301ea8013 100644 --- a/titan-hadoop-parent/titan-hadoop-2/pom.xml +++ b/titan-hadoop-parent/titan-hadoop-2/pom.xml @@ -40,6 +40,14 @@ test true + + + ${project.groupId} + titan-hbase-core + ${project.version} + test + org.apache.mrunit mrunit @@ -50,7 +58,7 @@ org.apache.hbase hbase-server - ${hbase098.version} + ${hbase100.version} true test @@ -58,14 +66,24 @@ org.mortbay.jetty servlet-api-2.5 + + com.lmax + disruptor + org.apache.hbase hbase-client - ${hbase098.version} + ${hbase100.version} true test + + + com.lmax + disruptor + + ${project.groupId} diff --git a/titan-hadoop-parent/titan-hadoop-core/src/test/java/com/thinkaurelius/titan/hadoop/AbstractInputFormatIT.java b/titan-hadoop-parent/titan-hadoop-core/src/test/java/com/thinkaurelius/titan/hadoop/AbstractInputFormatIT.java new file mode 100644 index 0000000000..945d0f90ee --- /dev/null +++ b/titan-hadoop-parent/titan-hadoop-core/src/test/java/com/thinkaurelius/titan/hadoop/AbstractInputFormatIT.java @@ -0,0 +1,102 @@ +package com.thinkaurelius.titan.hadoop; + +import com.thinkaurelius.titan.core.Cardinality; +import com.thinkaurelius.titan.core.TitanVertex; +import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration; +import com.thinkaurelius.titan.example.GraphOfTheGodsFactory; +import com.thinkaurelius.titan.graphdb.TitanGraphBaseTest; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; +import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer; +import org.apache.tinkerpop.gremlin.structure.Direction; +import org.apache.tinkerpop.gremlin.structure.Edge; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.structure.util.GraphFactory; +import org.junit.Test; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public abstract class AbstractInputFormatIT extends TitanGraphBaseTest { + + + @Test + public void testReadGraphOfTheGods() throws Exception { + GraphOfTheGodsFactory.load(graph, null, true); + assertEquals(12L, (long) graph.traversal().V().count().next()); + Graph g = getGraph(); + GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)); + assertEquals(12L, (long) t.V().count().next()); + } + + @Test + public void testReadWideVertexWithManyProperties() throws Exception { + int numProps = 1 << 16; + + long numV = 1; + mgmt.makePropertyKey("p").cardinality(Cardinality.LIST).dataType(Integer.class).make(); + mgmt.commit(); + finishSchema(); + + for (int j = 0; j < numV; j++) { + Vertex v = graph.addVertex(); + for (int i = 0; i < numProps; i++) { + v.property("p", i); + } + } + graph.tx().commit(); + + assertEquals(numV, (long) graph.traversal().V().count().next()); + Map propertiesOnVertex = graph.traversal().V().valueMap().next(); + List valuesOnP = (List)propertiesOnVertex.values().iterator().next(); + assertEquals(numProps, valuesOnP.size()); + Graph g = getGraph(); + GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)); + assertEquals(numV, (long) t.V().count().next()); + propertiesOnVertex = t.V().valueMap().next(); + valuesOnP = (List)propertiesOnVertex.values().iterator().next(); + assertEquals(numProps, valuesOnP.size()); + } + + @Test + public void testReadSelfEdge() throws Exception { + GraphOfTheGodsFactory.load(graph, null, true); + assertEquals(12L, (long) graph.traversal().V().count().next()); + + // Add a self-loop on sky with edge label "lives"; it's nonsense, but at least it needs no schema changes + TitanVertex sky = (TitanVertex)graph.query().has("name", "sky").vertices().iterator().next(); + assertNotNull(sky); + assertEquals("sky", sky.value("name")); + assertEquals(1L, sky.query().direction(Direction.IN).edgeCount()); + assertEquals(0L, sky.query().direction(Direction.OUT).edgeCount()); + assertEquals(1L, sky.query().direction(Direction.BOTH).edgeCount()); + sky.addEdge("lives", sky, "reason", "testReadSelfEdge"); + assertEquals(2L, sky.query().direction(Direction.IN).edgeCount()); + assertEquals(1L, sky.query().direction(Direction.OUT).edgeCount()); + assertEquals(3L, sky.query().direction(Direction.BOTH).edgeCount()); + graph.tx().commit(); + + // Read the new edge using the inputformat + Graph g = getGraph(); + GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)); + Iterator edgeIdIter = t.V().has("name", "sky").bothE().id(); + assertNotNull(edgeIdIter); + assertTrue(edgeIdIter.hasNext()); + Set edges = Sets.newHashSet(edgeIdIter); + assertEquals(2, edges.size()); + } + + abstract protected Graph getGraph() throws IOException, ConfigurationException; +} diff --git a/titan-hadoop-parent/titan-hadoop-core/src/test/java/com/thinkaurelius/titan/hadoop/CassandraInputFormatIT.java b/titan-hadoop-parent/titan-hadoop-core/src/test/java/com/thinkaurelius/titan/hadoop/CassandraInputFormatIT.java index 727a98d5ef..701d0de142 100644 --- a/titan-hadoop-parent/titan-hadoop-core/src/test/java/com/thinkaurelius/titan/hadoop/CassandraInputFormatIT.java +++ b/titan-hadoop-parent/titan-hadoop-core/src/test/java/com/thinkaurelius/titan/hadoop/CassandraInputFormatIT.java @@ -1,101 +1,15 @@ package com.thinkaurelius.titan.hadoop; import com.thinkaurelius.titan.CassandraStorageSetup; -import com.thinkaurelius.titan.core.Cardinality; -import com.thinkaurelius.titan.core.TitanVertex; import com.thinkaurelius.titan.diskstorage.configuration.ModifiableConfiguration; import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration; -import com.thinkaurelius.titan.example.GraphOfTheGodsFactory; -import com.thinkaurelius.titan.graphdb.TitanGraphBaseTest; -import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; -import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer; -import org.apache.tinkerpop.gremlin.structure.Direction; -import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Graph; -import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.tinkerpop.gremlin.structure.util.GraphFactory; -import org.junit.Test; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; +public class CassandraInputFormatIT extends AbstractInputFormatIT { -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -public class CassandraInputFormatIT extends TitanGraphBaseTest { - - - @Test - public void testReadGraphOfTheGods() { - GraphOfTheGodsFactory.load(graph, null, true); - assertEquals(12L, (long) graph.traversal().V().count().next()); - Graph g = GraphFactory.open("target/test-classes/cassandra-read.properties"); - GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)); - assertEquals(12L, (long) t.V().count().next()); - } - - @Test - public void testReadWideVertexWithManyProperties() { - int numProps = 1 << 16; - - long numV = 1; - mgmt.makePropertyKey("p").cardinality(Cardinality.LIST).dataType(Integer.class).make(); - mgmt.commit(); - finishSchema(); - - for (int j = 0; j < numV; j++) { - Vertex v = graph.addVertex(); - for (int i = 0; i < numProps; i++) { - v.property("p", i); - } - } - graph.tx().commit(); - - assertEquals(numV, (long) graph.traversal().V().count().next()); - Map propertiesOnVertex = graph.traversal().V().valueMap().next(); - List valuesOnP = (List)propertiesOnVertex.values().iterator().next(); - assertEquals(numProps, valuesOnP.size()); - Graph g = GraphFactory.open("target/test-classes/cassandra-read.properties"); - GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)); - assertEquals(numV, (long) t.V().count().next()); - propertiesOnVertex = t.V().valueMap().next(); - valuesOnP = (List)propertiesOnVertex.values().iterator().next(); - assertEquals(numProps, valuesOnP.size()); - } - - @Test - public void testReadSelfEdge() { - GraphOfTheGodsFactory.load(graph, null, true); - assertEquals(12L, (long) graph.traversal().V().count().next()); - - // Add a self-loop on sky with edge label "lives"; it's nonsense, but at least it needs no schema changes - TitanVertex sky = (TitanVertex)graph.query().has("name", "sky").vertices().iterator().next(); - assertNotNull(sky); - assertEquals("sky", sky.value("name")); - assertEquals(1L, sky.query().direction(Direction.IN).edgeCount()); - assertEquals(0L, sky.query().direction(Direction.OUT).edgeCount()); - assertEquals(1L, sky.query().direction(Direction.BOTH).edgeCount()); - sky.addEdge("lives", sky, "reason", "testReadSelfEdge"); - assertEquals(2L, sky.query().direction(Direction.IN).edgeCount()); - assertEquals(1L, sky.query().direction(Direction.OUT).edgeCount()); - assertEquals(3L, sky.query().direction(Direction.BOTH).edgeCount()); - graph.tx().commit(); - - // Read the new edge using the inputformat - Graph g = GraphFactory.open("target/test-classes/cassandra-read.properties"); - GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)); - Iterator edgeIdIter = t.V().has("name", "sky").bothE().id(); - assertNotNull(edgeIdIter); - assertTrue(edgeIdIter.hasNext()); - Set edges = Sets.newHashSet(edgeIdIter); - assertEquals(2, edges.size()); + protected Graph getGraph() { + return GraphFactory.open("target/test-classes/cassandra-read.properties"); } @Override diff --git a/titan-hadoop-parent/titan-hadoop-core/src/test/java/com/thinkaurelius/titan/hadoop/HBaseInputFormatIT.java b/titan-hadoop-parent/titan-hadoop-core/src/test/java/com/thinkaurelius/titan/hadoop/HBaseInputFormatIT.java new file mode 100644 index 0000000000..67c4ec98ae --- /dev/null +++ b/titan-hadoop-parent/titan-hadoop-core/src/test/java/com/thinkaurelius/titan/hadoop/HBaseInputFormatIT.java @@ -0,0 +1,46 @@ +package com.thinkaurelius.titan.hadoop; + +import com.thinkaurelius.titan.HBaseStorageSetup; +import com.thinkaurelius.titan.diskstorage.BackendException; +import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.hadoop.hbase.util.VersionInfo; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.util.GraphFactory; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +public class HBaseInputFormatIT extends AbstractInputFormatIT { + + @BeforeClass + public static void startHBase() throws IOException, BackendException { + HBaseStorageSetup.startHBase(); + } + + @AfterClass + public static void stopHBase() { + // Workaround for https://issues.apache.org/jira/browse/HBASE-10312 + if (VersionInfo.getVersion().startsWith("0.96")) + HBaseStorageSetup.killIfRunning(); + } + + protected Graph getGraph() throws IOException, ConfigurationException { + final PropertiesConfiguration config = new PropertiesConfiguration("target/test-classes/hbase-read.properties"); + Path baseOutDir = Paths.get((String) config.getProperty("gremlin.hadoop.outputLocation")); + baseOutDir.toFile().mkdirs(); + String outDir = Files.createTempDirectory(baseOutDir, null).toAbsolutePath().toString(); + config.setProperty("gremlin.hadoop.outputLocation", outDir); + return GraphFactory.open(config); + } + + @Override + public WriteConfiguration getConfiguration() { + return HBaseStorageSetup.getHBaseGraphConfiguration(); + } +} diff --git a/titan-hadoop-parent/titan-hadoop-core/src/test/resources/hbase-read.properties b/titan-hadoop-parent/titan-hadoop-core/src/test/resources/hbase-read.properties new file mode 100644 index 0000000000..2a45273100 --- /dev/null +++ b/titan-hadoop-parent/titan-hadoop-core/src/test/resources/hbase-read.properties @@ -0,0 +1,29 @@ +gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph +gremlin.hadoop.graphInputFormat=com.thinkaurelius.titan.hadoop.formats.hbase.HBaseInputFormat +gremlin.hadoop.graphOutputFormat=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat +#gremlin.hadoop.graphOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat + +gremlin.hadoop.jarsInDistributedCache=true +gremlin.hadoop.inputLocation=none +gremlin.hadoop.outputLocation=output +titanmr.ioformat.conf.storage.backend=hbase +titanmr.ioformat.conf.storage.hostname=localhost +##################################### +# GiraphGraphComputer Configuration # +##################################### +giraph.minWorkers=1 +giraph.maxWorkers=1 +giraph.SplitMasterWorker=false +giraph.useOutOfCoreGraph=true +giraph.useOutOfCoreMessages=true +mapred.map.child.java.opts=-Xmx1024m +mapred.reduce.child.java.opts=-Xmx1024m +giraph.numInputThreads=4 +giraph.numComputeThreads=4 +giraph.maxMessagesInMemory=100000 +#################################### +# SparkGraphComputer Configuration # +#################################### +spark.master=local[4] +spark.executor.memory=1g +spark.serializer=org.apache.spark.serializer.KryoSerializer From 30c59eaaee5929c46f02353aef12554ea952a9dd Mon Sep 17 00:00:00 2001 From: sjudeng Date: Fri, 19 Feb 2016 08:39:36 -0600 Subject: [PATCH 2/3] Resolve issue #1268. --- .../hadoop/formats/hbase/HBaseBinaryInputFormat.java | 9 +++++---- .../hadoop/formats/hbase/HBaseBinaryRecordReader.java | 8 +++++--- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/titan-hadoop-parent/titan-hadoop-core/src/main/java/com/thinkaurelius/titan/hadoop/formats/hbase/HBaseBinaryInputFormat.java b/titan-hadoop-parent/titan-hadoop-core/src/main/java/com/thinkaurelius/titan/hadoop/formats/hbase/HBaseBinaryInputFormat.java index ffcb5a0d58..71345fc944 100644 --- a/titan-hadoop-parent/titan-hadoop-core/src/main/java/com/thinkaurelius/titan/hadoop/formats/hbase/HBaseBinaryInputFormat.java +++ b/titan-hadoop-parent/titan-hadoop-core/src/main/java/com/thinkaurelius/titan/hadoop/formats/hbase/HBaseBinaryInputFormat.java @@ -10,8 +10,10 @@ import com.thinkaurelius.titan.hadoop.formats.util.AbstractBinaryInputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableRecordReader; @@ -32,7 +34,7 @@ public class HBaseBinaryInputFormat extends AbstractBinaryInputFormat { private static final Logger log = LoggerFactory.getLogger(HBaseBinaryInputFormat.class); private final TableInputFormat tableInputFormat = new TableInputFormat(); - private TableRecordReader tableReader; + private RecordReader tableReader; private byte[] inputCFBytes; private RecordReader> titanRecordReader; @@ -43,8 +45,7 @@ public List getSplits(final JobContext jobContext) throws IOExceptio @Override public RecordReader> createRecordReader(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { - tableReader = - (TableRecordReader) tableInputFormat.createRecordReader(inputSplit, taskAttemptContext); + tableReader = tableInputFormat.createRecordReader(inputSplit, taskAttemptContext); titanRecordReader = new HBaseBinaryRecordReader(tableReader, inputCFBytes); return titanRecordReader; @@ -90,7 +91,7 @@ public void setConf(final Configuration config) { this.tableInputFormat.setConf(config); } - public TableRecordReader getTableReader() { + public RecordReader getTableReader() { return tableReader; } diff --git a/titan-hadoop-parent/titan-hadoop-core/src/main/java/com/thinkaurelius/titan/hadoop/formats/hbase/HBaseBinaryRecordReader.java b/titan-hadoop-parent/titan-hadoop-core/src/main/java/com/thinkaurelius/titan/hadoop/formats/hbase/HBaseBinaryRecordReader.java index 269eb4207a..ed751b7753 100644 --- a/titan-hadoop-parent/titan-hadoop-core/src/main/java/com/thinkaurelius/titan/hadoop/formats/hbase/HBaseBinaryRecordReader.java +++ b/titan-hadoop-parent/titan-hadoop-core/src/main/java/com/thinkaurelius/titan/hadoop/formats/hbase/HBaseBinaryRecordReader.java @@ -5,6 +5,8 @@ import com.thinkaurelius.titan.diskstorage.StaticBuffer; import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer; import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntry; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableRecordReader; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; @@ -17,11 +19,11 @@ public class HBaseBinaryRecordReader extends RecordReader> { - private TableRecordReader reader; + private RecordReader reader; private final byte[] edgestoreFamilyBytes; - public HBaseBinaryRecordReader(final TableRecordReader reader, final byte[] edgestoreFamilyBytes) { + public HBaseBinaryRecordReader(final RecordReader reader, final byte[] edgestoreFamilyBytes) { this.reader = reader; this.edgestoreFamilyBytes = edgestoreFamilyBytes; } @@ -52,7 +54,7 @@ public void close() throws IOException { } @Override - public float getProgress() { + public float getProgress() throws IOException, InterruptedException { return this.reader.getProgress(); } From 21af128089084c6126df5fd71c0de951c81c13bd Mon Sep 17 00:00:00 2001 From: sjudeng Date: Fri, 19 Feb 2016 09:05:15 -0600 Subject: [PATCH 3/3] Update output location for HBaseInputFormat tests. --- .../titan-hadoop-core/src/test/resources/hbase-read.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/titan-hadoop-parent/titan-hadoop-core/src/test/resources/hbase-read.properties b/titan-hadoop-parent/titan-hadoop-core/src/test/resources/hbase-read.properties index 2a45273100..ce87977581 100644 --- a/titan-hadoop-parent/titan-hadoop-core/src/test/resources/hbase-read.properties +++ b/titan-hadoop-parent/titan-hadoop-core/src/test/resources/hbase-read.properties @@ -5,7 +5,7 @@ gremlin.hadoop.graphOutputFormat=org.apache.hadoop.mapreduce.lib.output.NullOutp gremlin.hadoop.jarsInDistributedCache=true gremlin.hadoop.inputLocation=none -gremlin.hadoop.outputLocation=output +gremlin.hadoop.outputLocation=target/output titanmr.ioformat.conf.storage.backend=hbase titanmr.ioformat.conf.storage.hostname=localhost #####################################