From 34db7ef701627a9fde72627ff449305a672191fc Mon Sep 17 00:00:00 2001 From: root Date: Thu, 1 Sep 2016 20:11:59 -0700 Subject: [PATCH] test-pr --- pom.xml | 269 ++++++++-------- .../scala/com/neilferguson/PopStrat.scala | 298 +++++++++--------- 2 files changed, 290 insertions(+), 277 deletions(-) diff --git a/pom.xml b/pom.xml index 5e3ce0b..ed78647 100644 --- a/pom.xml +++ b/pom.xml @@ -1,136 +1,133 @@ - - 4.0.0 - com.neilferguson - popstrat - 0.1-SNAPSHOT - - 1.2.0 - 3.0.0.8 - 1.2.5 - 0.16.0 - - - - scala-tools.org - Scala-tools Maven2 Repository - http://scala-tools.org/repo-releases - - - - - org.apache.spark - spark-core_2.10 - ${spark.version} - - - org.bdgenomics.adam - adam-core - ${adam.version} - - - org.bdgenomics.adam - adam-apis - ${adam.version} - - - ai.h2o - sparkling-water-core_2.10 - ${sparklingwater.version} - - - ai.h2o - sparkling-water-examples_2.10 - ${sparklingwater.version} - - - ai.h2o - h2o-core - ${h2o.version} - - - ai.h2o - h2o-scala_2.10 - ${h2o.version} - - - ai.h2o - h2o-algos - ${h2o.version} - - - ai.h2o - h2o-app - ${h2o.version} - - - ai.h2o - h2o-persist-hdfs - ${h2o.version} - - - ai.h2o - google-analytics-java - 1.1.2-H2O-CUSTOM - - - joda-time - joda-time - 2.7 - - - - src/main/scala - - - org.scala-tools - maven-scala-plugin - 2.14.1 - - - - compile - - - - - - org.apache.maven.plugins - maven-compiler-plugin - 2.3.2 - - 1.8 - 1.8 - - - - org.apache.maven.plugins - maven-shade-plugin - 2.3 - - - package - - shade - - - - - - - *:* - - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - - - - uber-${project.artifactId}-${project.version} - - - - - + + 4.0.0 + popstrat-master.src.main.scala.com + neilferguson + 0.0.1-SNAPSHOT + + 1.6.1 + 3.8.2.6 + 1.6.5 + 0.19.0 + + + + scala-tools.org + Scala-tools Maven2 Repository + http://scala-tools.org/repo-releases + + + + + org.apache.spark + spark-core_2.10 + ${spark.version} + + + org.bdgenomics.adam + adam-core_2.10 + ${adam.version} + + + org.bdgenomics.adam + adam-apis_2.10 + ${adam.version} + + + ai.h2o + sparkling-water-core_2.10 + ${sparklingwater.version} + + + ai.h2o + sparkling-water-examples_2.10 + ${sparklingwater.version} + + + ai.h2o + h2o-core + ${h2o.version} + + + ai.h2o + h2o-scala_2.10 + ${h2o.version} + + + ai.h2o + h2o-algos + ${h2o.version} + + + ai.h2o + h2o-app + ${h2o.version} + + + ai.h2o + h2o-persist-hdfs + ${h2o.version} + + + ai.h2o + google-analytics-java + 1.1.2-H2O-CUSTOM + + + joda-time + joda-time + 2.7 + + + + src/main/scala + + + org.scala-tools + maven-scala-plugin + 2.15.2 + + + + compile + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 2.3.2 + + 1.8 + 1.8 + + + + org.apache.maven.plugins + maven-shade-plugin + 2.3 + + + package + + shade + + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + uber-${project.artifactId}-${project.version} + + + + + \ No newline at end of file diff --git a/src/main/scala/com/neilferguson/PopStrat.scala b/src/main/scala/com/neilferguson/PopStrat.scala index 286fb39..2cc7d0e 100644 --- a/src/main/scala/com/neilferguson/PopStrat.scala +++ b/src/main/scala/com/neilferguson/PopStrat.scala @@ -1,141 +1,157 @@ -package com.neilferguson - -import hex.FrameSplitter -import hex.deeplearning.DeepLearning -import hex.deeplearning.DeepLearningModel.DeepLearningParameters -import hex.deeplearning.DeepLearningModel.DeepLearningParameters.Activation -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.SparkContext.rddToPairRDDFunctions -import org.apache.spark.h2o.H2OContext -import org.apache.spark.rdd.RDD -import org.apache.spark.sql._ -import org.bdgenomics.adam.rdd.ADAMContext._ -import org.bdgenomics.formats.avro.{Genotype, GenotypeAllele} -import water.Key - -import scala.collection.JavaConverters._ -import scala.collection.immutable.Range.inclusive -import scala.io.Source - -object PopStrat { - - def main(args: Array[String]): Unit = { - - val genotypeFile = args(0) - val panelFile = args(1) - - val master = if (args.length > 2) Some(args(2)) else None - val conf = new SparkConf().setAppName("PopStrat") - master.foreach(conf.setMaster) - val sc = new SparkContext(conf) - - // Create a set of the populations that we want to predict - // Then create a map of sample ID -> population so that we can filter out the samples we're not interested in - val populations = Set("GBR", "ASW", "CHB") - def extract(file: String, filter: (String, String) => Boolean): Map[String,String] = { - Source.fromFile(file).getLines().map(line => { - val tokens = line.split("\t").toList - tokens(0) -> tokens(1) - }).toMap.filter(tuple => filter(tuple._1, tuple._2)) - } - val panel: Map[String,String] = extract(panelFile, (sampleID: String, pop: String) => populations.contains(pop)) - - // Load the ADAM genotypes from the parquet file(s) - // Next, filter the genotypes so that we're left with only those in the populations we're interested in - val allGenotypes: RDD[Genotype] = sc.loadGenotypes(genotypeFile) - val genotypes: RDD[Genotype] = allGenotypes.filter(genotype => {panel.contains(genotype.getSampleId)}) - - // Convert the Genotype objects to our own SampleVariant objects to try and conserve memory - case class SampleVariant(sampleId: String, variantId: Int, alternateCount: Int) - def variantId(genotype: Genotype): String = { - val name = genotype.getVariant.getContig.getContigName - val start = genotype.getVariant.getStart - val end = genotype.getVariant.getEnd - s"$name:$start:$end" - } - def alternateCount(genotype: Genotype): Int = { - genotype.getAlleles.asScala.count(_ != GenotypeAllele.Ref) - } - def toVariant(genotype: Genotype): SampleVariant = { - // Intern sample IDs as they will be repeated a lot - new SampleVariant(genotype.getSampleId.intern(), variantId(genotype).hashCode(), alternateCount(genotype)) - } - val variantsRDD: RDD[SampleVariant] = genotypes.map(toVariant) - - // Group the variants by sample ID so we can process the variants sample-by-sample - // Then get the total number of samples. This will be used to find variants that are missing for some samples. - // Group the variants by variant ID and filter out those variants that are missing from some samples - val variantsBySampleId: RDD[(String, Iterable[SampleVariant])] = variantsRDD.groupBy(_.sampleId) - val sampleCount: Long = variantsBySampleId.count() - println("Found " + sampleCount + " samples") - val variantsByVariantId: RDD[(Int, Iterable[SampleVariant])] = variantsRDD.groupBy(_.variantId).filter { - case (_, sampleVariants) => sampleVariants.size == sampleCount - } - - // Make a map of variant ID -> count of samples with an alternate count of greater than zero - // then filter out those variants that are not in our desired frequency range. The objective here is simply to - // reduce the number of dimensions in the data set to make it easier to train the model. - // The specified range is fairly arbitrary and was chosen based on the fact that it includes a reasonable - // number of variants, but not too many. - val variantFrequencies: collection.Map[Int, Int] = variantsByVariantId.map { - case (variantId, sampleVariants) => (variantId, sampleVariants.count(_.alternateCount > 0)) - }.collectAsMap() - val permittedRange = inclusive(11, 11) - val filteredVariantsBySampleId: RDD[(String, Iterable[SampleVariant])] = variantsBySampleId.map { - case (sampleId, sampleVariants) => - val filteredSampleVariants = sampleVariants.filter(variant => permittedRange.contains( - variantFrequencies.getOrElse(variant.variantId, -1))) - (sampleId, filteredSampleVariants) - } - - // Sort the variants for each sample ID. Each sample should now have the same number of sorted variants. - // All items in the RDD should now have the same variants in the same order so we can just use the first - // one to construct our header - // Next construct the rows of our SchemaRDD from the variants - val sortedVariantsBySampleId: RDD[(String, Array[SampleVariant])] = filteredVariantsBySampleId.map { - case (sampleId, variants) => - (sampleId, variants.toArray.sortBy(_.variantId)) - } - val header = StructType(Array(StructField("Region", StringType)) ++ - sortedVariantsBySampleId.first()._2.map(variant => {StructField(variant.variantId.toString, IntegerType)})) - val rowRDD: RDD[Row] = sortedVariantsBySampleId.map { - case (sampleId, sortedVariants) => - val region: Array[String] = Array(panel.getOrElse(sampleId, "Unknown")) - val alternateCounts: Array[Int] = sortedVariants.map(_.alternateCount) - Row.fromSeq(region ++ alternateCounts) - } - - // Create the SchemaRDD from the header and rows and convert the SchemaRDD into a H2O dataframe - val sqlContext = new org.apache.spark.sql.SQLContext(sc) - val schemaRDD = sqlContext.applySchema(rowRDD, header) - val h2oContext = new H2OContext(sc).start() - import h2oContext._ - val dataFrame = h2oContext.toDataFrame(schemaRDD) - - // Split the dataframe into 50% training, 30% test, and 20% validation data - val frameSplitter = new FrameSplitter(dataFrame, Array(.5, .3), Array("training", "test", "validation").map(Key.make), null) - water.H2O.submitTask(frameSplitter) - val splits = frameSplitter.getResult - val training = splits(0) - val validation = splits(2) - - // Set the parameters for our deep learning model. - val deepLearningParameters = new DeepLearningParameters() - deepLearningParameters._train = training - deepLearningParameters._valid = validation - deepLearningParameters._response_column = "Region" - deepLearningParameters._epochs = 10 - deepLearningParameters._activation = Activation.RectifierWithDropout - deepLearningParameters._hidden = Array[Int](100,100) - - // Train the deep learning model - val deepLearning = new DeepLearning(deepLearningParameters) - val deepLearningModel = deepLearning.trainModel.get - - // Score the model against the entire dataset (training, test, and validation data) - // This causes the confusion matrix to be printed - deepLearningModel.score(dataFrame)('predict) - - } - -} +package com.neilferguson + +import hex.FrameSplitter +import hex.deeplearning.DeepLearning +import hex.deeplearning.DeepLearningModel.DeepLearningParameters +import hex.deeplearning.DeepLearningModel.DeepLearningParameters.Activation +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.SparkContext._ +import org.apache.spark.h2o.H2OContext +import org.apache.spark.rdd.RDD +import org.apache.spark.sql._ +import org.apache.spark.sql.types.DataTypes +import org.bdgenomics.adam.rdd.ADAMContext._ +import org.bdgenomics.formats.avro.{Genotype, GenotypeAllele} +import water.Key +import hex._ +import water.fvec._ +import water.support._ +import scala.collection.JavaConverters._ +import scala.collection.immutable.Range.inclusive +import scala.io.Source +import org.apache.spark.sql.types.DataTypes + + + +import _root_.hex.Distribution.Family +import _root_.hex.deeplearning.DeepLearningModel +import _root_.hex.tree.gbm.GBMModel +import _root_.hex.{Model, ModelMetricsBinomial} +object PopStrat { + + def main(args: Array[String]): Unit = { + + val genotypeFile = args(0) + val panelFile = args(1) + + val master = if (args.length > 2) Some(args(2)) else None + val conf = new SparkConf().setAppName("PopStrat") + master.foreach(conf.setMaster) + val sc = new SparkContext(conf) + + // Create a set of the populations that we want to predict + // Then create a map of sample ID -> population so that we can filter out the samples we're not interested in + val populations = Set("GBR", "ASW", "CHB") + def extract(file: String, filter: (String, String) => Boolean): Map[String,String] = { + Source.fromFile(file).getLines().map(line => { + val tokens = line.split("\t").toList + tokens(0) -> tokens(1) + }).toMap.filter(tuple => filter(tuple._1, tuple._2)) + } + val panel: Map[String,String] = extract(panelFile, (sampleID: String, pop: String) => populations.contains(pop)) + + // Load the ADAM genotypes from the parquet file(s) + // Next, filter the genotypes so that we're left with only those in the populations we're interested in + val allGenotypes: RDD[Genotype] = sparkContextToADAMContext(sc).loadGenotypes(genotypeFile) + val genotypes: RDD[Genotype] = allGenotypes.filter(genotype => {panel.contains(genotype.getSampleId)}) + + // Convert the Genotype objects to our own SampleVariant objects to try and conserve memory + case class SampleVariant(sampleId: String, variantId: Int, alternateCount: Int) + def variantId(genotype: Genotype): String = { + val name = genotype.getVariant.getContig.getContigName + val start = genotype.getVariant.getStart + val end = genotype.getVariant.getEnd + s"$name:$start:$end" + } + def alternateCount(genotype: Genotype): Int = { + genotype.getAlleles.asScala.count(_ != GenotypeAllele.Ref) + } + def toVariant(genotype: Genotype): SampleVariant = { + // Intern sample IDs as they will be repeated a lot + new SampleVariant(genotype.getSampleId.intern(), variantId(genotype).hashCode(), alternateCount(genotype)) + } + val variantsRDD: RDD[SampleVariant] = genotypes.map(toVariant) + + // Group the variants by sample ID so we can process the variants sample-by-sample + // Then get the total number of samples. This will be used to find variants that are missing for some samples. + // Group the variants by variant ID and filter out those variants that are missing from some samples + val variantsBySampleId: RDD[(String, Iterable[SampleVariant])] = variantsRDD.groupBy(_.sampleId) + val sampleCount: Long = variantsBySampleId.count() + println("Found " + sampleCount + " samples") + val variantsByVariantId: RDD[(Int, Iterable[SampleVariant])] = variantsRDD.groupBy(_.variantId).filter { + case (_, sampleVariants) => sampleVariants.size == sampleCount + } + + // Make a map of variant ID -> count of samples with an alternate count of greater than zero + // then filter out those variants that are not in our desired frequency range. The objective here is simply to + // reduce the number of dimensions in the data set to make it easier to train the model. + // The specified range is fairly arbitrary and was chosen based on the fact that it includes a reasonable + // number of variants, but not too many. + val variantFrequencies: collection.Map[Int, Int] = variantsByVariantId.map { + case (variantId, sampleVariants) => (variantId, sampleVariants.count(_.alternateCount > 0)) + }.collectAsMap() + val permittedRange = inclusive(11, 11) + val filteredVariantsBySampleId: RDD[(String, Iterable[SampleVariant])] = variantsBySampleId.map { + case (sampleId, sampleVariants) => + val filteredSampleVariants = sampleVariants.filter(variant => permittedRange.contains( + variantFrequencies.getOrElse(variant.variantId, -1))) + (sampleId, filteredSampleVariants) + } + + // Sort the variants for each sample ID. Each sample should now have the same number of sorted variants. + // All items in the RDD should now have the same variants in the same order so we can just use the first + // one to construct our header + // Next construct the rows of our SchemaRDD from the variants + val sortedVariantsBySampleId: RDD[(String, Array[SampleVariant])] = filteredVariantsBySampleId.map { + case (sampleId, variants) => + (sampleId, variants.toArray.sortBy(_.variantId)) + } + val header = DataTypes.createStructType(Array(DataTypes.createStructField("Region", DataTypes.StringType,false)) ++ + sortedVariantsBySampleId.first()._2.map(variant => {DataTypes.createStructField(variant.variantId.toString,DataTypes.IntegerType,false)})) + val rowRDD: RDD[Row] = sortedVariantsBySampleId.map { + case (sampleId, sortedVariants) => + val region: Array[String] = Array(panel.getOrElse(sampleId, "Unknown")) + val alternateCounts: Array[Int] = sortedVariants.map(_.alternateCount) + Row.fromSeq(region ++ alternateCounts) + } + + // Create the SchemaRDD from the header and rows and convert the SchemaRDD into a H2O dataframe + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + //val dataFrame=sqlContext.createDataFrame(rowRDD, header) + val schemaRDD = sqlContext.applySchema(rowRDD, header) + val h2oContext = new H2OContext(sc).start() + import h2oContext._ + val dataFrame1 =h2oContext.asH2OFrame(schemaRDD) + val dataFrame=H2OFrameSupport.allStringVecToCategorical(dataFrame1) + //val dataFrame = h2oContext.asDataFrame(dataFrame1)(sqlContext) + // Split final data table +// val keys = Array[String]("training.hex", "test.hex","validation.hex") +// val ratios = Array[Double](0.5, 0.3) +// val frameSplitter = H2OFrameSupport.splitFrame(dataFrame, keys, ratios) + // val (training, test,validation) = (frameSplitter(0), frameSplitter(1),frameSplitter(2)) + // Split the dataframe into 50% training, 30% test, and 20% validation data + val frameSplitter = new FrameSplitter(dataFrame, Array(.5, .3), Array("training", "test", "validation").map(Key.make[Frame](_)), null) + + water.H2O.submitTask(frameSplitter) + val splits = frameSplitter.getResult + val training = splits(0) + val validation = splits(2) + // Set the parameters for our deep learning model. + val deepLearningParameters = new DeepLearningParameters() + deepLearningParameters._train = training._key + deepLearningParameters._valid = validation._key + deepLearningParameters._response_column = "Region" + deepLearningParameters._epochs = 10 + deepLearningParameters._activation = Activation.RectifierWithDropout + deepLearningParameters._hidden = Array[Int](100,100) + + // Train the deep learning model + val deepLearning = new DeepLearning(deepLearningParameters) + val deepLearningModel = deepLearning.trainModel.get + + // Score the model against the entire dataset (training, test, and validation data) + // This causes the confusion matrix to be printed + deepLearningModel.score(dataFrame) + + } +}