diff --git a/tools/import/build.sbt b/tools/import/build.sbt new file mode 100644 index 0000000..b67db6f --- /dev/null +++ b/tools/import/build.sbt @@ -0,0 +1,11 @@ +name := "import-tools" + +organization := "org.precog" + +version := "0.1" + +scalaVersion := "2.9.2" + + +scalacOptions ++= Seq("-unchecked", "-deprecation") + diff --git a/tools/import/common/build.sbt b/tools/import/common/build.sbt new file mode 100644 index 0000000..3e94c51 --- /dev/null +++ b/tools/import/common/build.sbt @@ -0,0 +1,18 @@ +name := "import-common" + +organization := "org.precog" + +version := "0.1" + +scalaVersion := "2.9.2" + +resolvers ++= Seq( + "ReportGrid (public)" at "http://nexus.reportgrid.com/content/repositories/public-releases" +) + +libraryDependencies ++= Seq( + "com.reportgrid" %% "blueeyes-core" % "1.0.0-M8.1", + "com.reportgrid" %% "blueeyes-json" % "1.0.0-M8.1" + ) + + diff --git a/tools/import/common/project/Build.scala b/tools/import/common/project/Build.scala new file mode 100644 index 0000000..4ef5d5c --- /dev/null +++ b/tools/import/common/project/Build.scala @@ -0,0 +1,5 @@ +import sbt._ +object CommonProj extends Build +{ + lazy val root = Project("import-common", file(".")) +} diff --git a/tools/import/common/project/plugins.sbt b/tools/import/common/project/plugins.sbt new file mode 100644 index 0000000..3ccee0c --- /dev/null +++ b/tools/import/common/project/plugins.sbt @@ -0,0 +1 @@ +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.7") \ No newline at end of file diff --git a/tools/import/common/src/main/scala/com/precog/tools/importers/common/ConsoleUtils.scala b/tools/import/common/src/main/scala/com/precog/tools/importers/common/ConsoleUtils.scala new file mode 100644 index 0000000..db2d1b0 --- /dev/null +++ b/tools/import/common/src/main/scala/com/precog/tools/importers/common/ConsoleUtils.scala @@ -0,0 +1,62 @@ +package com.precog.tools.importers.common + +import annotation.tailrec + +/** + * User: gabriel + * Date: 1/25/13 + */ +object ConsoleUtils { + + @tailrec + def selectSet[T](label:String, available: Seq[T], selected: Seq[T]=List()): Seq[T] = + if (available.isEmpty) selected + else { + + println("Available %ss:".format(label)) + println(present(available)) + + println("Selected %ss:".format(label)) + println(present(selected)) + + println("Select %ss by entering its number or name, 0 to select all, enter to continue: ".format(label)) + + val selIdx = readLine() + selIdx match { + case "" => selected + case ParseInt(0) => available + case ParseInt(x) if x<=available.size => { + val elem:T = available(x - 1) + selectSet(label,available.filterNot(_==elem), selected:+elem) + } + case s:String if (available.exists(_.toString == s)) => { + val elem:T =available.find(_.toString == s).get + selectSet(label,available.filterNot(_==elem), selected:+elem) + } + case _ => selectSet(label,available, selected) + } + } + + @tailrec + def selectOne[T](label:String, available: Seq[T]): T ={ + + println("Available %ss:".format(label)) + println(present(available)) + + println("Select one %s by entering its number or name: ".format(label)) + + val selIdx = readLine() + selIdx match { + case ParseInt(x) if x<=available.size => available(x - 1) + case s:String => available.find(_.toString == s) match { + case Some(t) => t + case None => selectOne(label,available) + } + case _ => selectOne(label,available) + } + } + + def present[T](arr:Seq[T])= arr.zipWithIndex.map({ case (a, b) => (b+1) + ":" + a }).mkString(", ") + + +} diff --git a/tools/import/common/src/main/scala/com/precog/tools/importers/common/Ingest.scala b/tools/import/common/src/main/scala/com/precog/tools/importers/common/Ingest.scala new file mode 100644 index 0000000..c95384a --- /dev/null +++ b/tools/import/common/src/main/scala/com/precog/tools/importers/common/Ingest.scala @@ -0,0 +1,61 @@ +package com.precog.tools.importers.common + +import akka.dispatch._ +import akka.dispatch.Future +import scalaz.Monad + +import blueeyes.json._ +import blueeyes.core.data.DefaultBijections._ +import blueeyes.core.service._ +import blueeyes.core.service.engines.HttpClientXLightWeb +import blueeyes.bkka.FutureMonad +import scalaz.StreamT +import java.nio.ByteBuffer +import blueeyes.core.http.HttpResponse +import blueeyes.core.data.ByteChunk +import scala.Right +import org.slf4j.LoggerFactory + +/** + * User: gabriel + * Date: 3/21/13 + */ +object Ingest { + + private lazy val logger = LoggerFactory.getLogger("com.precog.tools.importers.jdbc.Ingest") + + def sendToPrecog(host:String, path:String, apiKey:String, dataStream:StreamT[Future,ByteBuffer], streaming:Boolean = true)(implicit executor:ExecutionContext): Future[HttpResponse[ByteChunk]] = { + implicit val M = new FutureMonad(executor) + val httpClient = new HttpClientXLightWeb()(executor) + + dataStream.isEmpty.flatMap( isEmpty => + if (isEmpty) { + logger.info("No need to send empty data stream") + Future(HttpResponse.empty) + } else { + val byteChunks: ByteChunk = Right(dataStream) + //val fullPath = "%s/ingest/v1/fs%s".format(host, path) + val fullPath = "%s/fs%s".format(host, path) //local test only + val ingestParams = ('apiKey -> apiKey)::( if (streaming) List('mode -> "streaming") else List('mode -> "batch", 'receipt -> "true")) + logger.info("Ingesting to %s".format(path)) + httpClient.parameters(ingestParams:_*).header("Content-Type","application/json").post(fullPath)(byteChunks) + } + ) + } + + def callSucceded(response:HttpResponse[ByteChunk]){ + response match { + case HttpResponse(_ ,_,Some(Left(buffer)),_) => logger.info("Result: %s".format(new String(buffer.array(), "UTF-8"))) + case _ => logger.error("Unexpected stream in %s".format(response)) + } + } + + + def toByteStream(dataStream: StreamT[Future, JValue])(implicit m:Monad[Future]): StreamT[Future, ByteBuffer] = { + dataStream.map(jv => ByteBuffer.wrap({ + val js = "%s\n".format(jv.renderCompact) + logger.trace("to bytes = %s".format(js.replace('\n',' '))) + js + }.getBytes("UTF-8"))) + } +} diff --git a/tools/import/common/src/main/scala/com/precog/tools/importers/common/package.scala b/tools/import/common/src/main/scala/com/precog/tools/importers/common/package.scala new file mode 100644 index 0000000..e97bdc4 --- /dev/null +++ b/tools/import/common/src/main/scala/com/precog/tools/importers/common/package.scala @@ -0,0 +1,18 @@ +package com.precog.tools.importers + +/** + * User: gabriel + * Date: 1/25/13 + */ +package object common { + + object ParseInt{ + def unapply(s : String) : Option[Int] = try { + Some(s.toInt) + } catch { + case _ : java.lang.NumberFormatException => None + } + } + + +} diff --git a/tools/import/jdbc/build.sbt b/tools/import/jdbc/build.sbt index 625d5c4..c4ed52d 100644 --- a/tools/import/jdbc/build.sbt +++ b/tools/import/jdbc/build.sbt @@ -1,5 +1,7 @@ name := "import-jdbc" +organization := "org.precog" + version := "0.1" scalaVersion := "2.9.2" @@ -17,9 +19,12 @@ resolvers ++= Seq( scalacOptions ++= Seq("-unchecked", "-deprecation") +assemblySettings + libraryDependencies ++= Seq( - "com.reportgrid" % "blueeyes-core_2.9.2" % "1.0.0-M6", - "com.reportgrid" % "blueeyes-json_2.9.2" % "1.0.0-M6", + "com.reportgrid" % "blueeyes-core_2.9.2" % "1.0.0-M8.1", + "com.reportgrid" % "blueeyes-json_2.9.2" % "1.0.0-M8.1", "org.specs2" %% "specs2" % "1.12.2" , - "com.h2database" % "h2" % "1.2.134" % "test" + "com.h2database" % "h2" % "1.2.134" % "test", + "ch.qos.logback" % "logback-classic" % "1.0.0" ) diff --git a/tools/import/jdbc/project/Build.scala b/tools/import/jdbc/project/Build.scala new file mode 100644 index 0000000..ed9f1b3 --- /dev/null +++ b/tools/import/jdbc/project/Build.scala @@ -0,0 +1,9 @@ +import sbt._ +object JdbcImportProj extends Build +{ + lazy val root = + Project("import-jdbc", file(".")) dependsOn(common) + lazy val common = + ProjectRef(uri("../common/"), "import-common") +} + diff --git a/tools/import/jdbc/project/plugins.sbt b/tools/import/jdbc/project/plugins.sbt new file mode 100644 index 0000000..3ccee0c --- /dev/null +++ b/tools/import/jdbc/project/plugins.sbt @@ -0,0 +1 @@ +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.7") \ No newline at end of file diff --git a/tools/import/jdbc/src/main/resources/application.conf b/tools/import/jdbc/src/main/resources/application.conf new file mode 100644 index 0000000..85664e8 --- /dev/null +++ b/tools/import/jdbc/src/main/resources/application.conf @@ -0,0 +1,8 @@ +blueeyes-async { + name = "DefaultActorPool" + keep-alive-time = 5s + core-pool-size-factor = 1.0 + core-pool-size-max = 8 + max-pool-size-factor = 1.0 + max-pool-size-max = 8 +} \ No newline at end of file diff --git a/tools/import/jdbc/src/main/scala/com/precog/tools/importers/jdbc/DbAccess.scala b/tools/import/jdbc/src/main/scala/com/precog/tools/importers/jdbc/DbAccess.scala index c4d0922..f8c785c 100644 --- a/tools/import/jdbc/src/main/scala/com/precog/tools/importers/jdbc/DbAccess.scala +++ b/tools/import/jdbc/src/main/scala/com/precog/tools/importers/jdbc/DbAccess.scala @@ -2,12 +2,19 @@ package com.precog.tools.importers.jdbc import java.sql._ import Datatypes._ +import scalaz.StreamT +import annotation.tailrec +import scalaz.effect.IO +import org.slf4j.LoggerFactory /** * User: gabriel * Date: 11/30/12 */ object DbAccess { + + private lazy val logger = LoggerFactory.getLogger("com.precog.tools.importers.jdbc.DbAccess") + def columnCount(stmt:PreparedStatement)=stmt.getMetaData.getColumnCount def getColumns(conn:Connection, table:Table):IndexedSeq[Column]={ @@ -26,24 +33,31 @@ object DbAccess { for ( i <- 1 to count) yield Column(tblMetaData.getColumnName(i),Table(tblMetaData.getTableName(i))) } - def rsIterator[T](rs:ResultSet)(f:ResultSet => T) = new Iterator[T] { - def hasNext = rs.next() - def next():T = f(rs) + + + def rsList[T](rs:ResultSet)(f:ResultSet => T)={ + + @tailrec + def buildList(rs:ResultSet, acc:List[T]=Nil):List[T]= + if (rs.next()) buildList(rs, f(rs)::acc) + else acc + + buildList(rs).reverse } - def oneColumnRs(rs:ResultSet) = rsIterator(rs)(rs=> rs.getString(1)) - def tables(rs:ResultSet) = rsIterator(rs)(rs=> Table(rs.getString("TABLE_NAME"))) - def columns(rs:ResultSet) = rsIterator(rs)(rs=> Column(rs.getString("COLUMN_NAME"), Table(rs.getString("TABLE_NAME")))) - def relationshipDesc(rs:ResultSet) = rsIterator(rs)( + def rsStreamT[T](rs:ResultSet)(f:ResultSet => T)=StreamT.unfoldM(rs)( + (rs:ResultSet) => IO( { val d=if (rs.next()) { Some(f(rs),rs)} else None; logger.info("read stream = %s".format(d)); d })) + + def rsStream[T](rs:ResultSet)(f:ResultSet => T):Stream[T] = if (rs.next) f(rs) #:: rsStream(rs)(f) else Stream.empty + + def oneColumnRs(rs:ResultSet) =rsList(rs)(rs=> rs.getString(1)) + def tables(rs:ResultSet) = rsList(rs)(rs=> Table(rs.getString("TABLE_NAME"))) + def columns(rs:ResultSet) = rsList(rs)(rs=> Column(rs.getString("COLUMN_NAME"), Table(rs.getString("TABLE_NAME")))) + def relationshipDesc(rs:ResultSet) = rsList(rs)( rs=> PkFkRelation( Key(Table(rs.getString("PKTABLE_NAME")),rs.getString("PKCOLUMN_NAME")), Key(Table(rs.getString("FKTABLE_NAME")),rs.getString("FKCOLUMN_NAME")), rs.getInt("KEY_SEQ") ) ) - - def allSet(rs:ResultSet) = { - val count= rs.getMetaData.getColumnCount - rsIterator(rs)(rs=> for ( i <- 1 to count) yield rs.getString(i) ) - } } diff --git a/tools/import/jdbc/src/main/scala/com/precog/tools/importers/jdbc/DbAnalysis.scala b/tools/import/jdbc/src/main/scala/com/precog/tools/importers/jdbc/DbAnalysis.scala index 41af715..cc023af 100644 --- a/tools/import/jdbc/src/main/scala/com/precog/tools/importers/jdbc/DbAnalysis.scala +++ b/tools/import/jdbc/src/main/scala/com/precog/tools/importers/jdbc/DbAnalysis.scala @@ -18,7 +18,7 @@ object DbAnalysis{ def findTables(metadata: DatabaseMetaData, oCat: Option[String], tableName: => Option[String]): Array[Table] = { - val cat= toNullUppercase(oCat) + val cat= oCat.getOrElse(null) val tableNm= tableName.map(_.toUpperCase).getOrElse(null) tables(metadata.getTables(cat, null, tableNm, Array("TABLE"))).toArray } diff --git a/tools/import/jdbc/src/main/scala/com/precog/tools/importers/jdbc/ImportJdbc.scala b/tools/import/jdbc/src/main/scala/com/precog/tools/importers/jdbc/ImportJdbc.scala index 8ff1bd5..da96741 100644 --- a/tools/import/jdbc/src/main/scala/com/precog/tools/importers/jdbc/ImportJdbc.scala +++ b/tools/import/jdbc/src/main/scala/com/precog/tools/importers/jdbc/ImportJdbc.scala @@ -3,11 +3,21 @@ package com.precog.tools.importers.jdbc import java.sql._ import blueeyes.json._ import blueeyes.core.data.DefaultBijections._ -import blueeyes.core.service._ import blueeyes.bkka.AkkaDefaults.defaultFutureDispatch import scala.Some import blueeyes.core.service.engines.HttpClientXLightWeb import Datatypes._ +import blueeyes.bkka.FutureMonad +import scalaz.{Monad, StreamT,Hoist, ~>} +import akka.dispatch.ExecutionContext +import java.nio.ByteBuffer +import blueeyes.core.http.HttpResponse +import blueeyes.core.data.ByteChunk +import akka.dispatch.Future +import com.precog.tools.importers.common.Ingest._ +import DbAccess._ +import scalaz.effect.IO +import org.slf4j.LoggerFactory /** * User: gabriel @@ -15,7 +25,9 @@ import Datatypes._ */ object ImportJdbc { - import DbAccess._ + val httpClient=new HttpClientXLightWeb()(defaultFutureDispatch) + + private lazy val logger = LoggerFactory.getLogger("com.precog.tools.importers.jdbc.ImportJdbc") case class ImportTable(name:String, columns:Seq[String], baseOrJoin:Either[Table,Join]){ val isCollection = baseOrJoin.right.toOption.map(_.exported).getOrElse(false) } case class IngestInfo(tables:Seq[ImportTable]) @@ -34,53 +46,63 @@ object ImportJdbc { def buildSort(ingestInfo:IngestInfo) =ingestInfo.tables.flatMap( t => t.columns.map("%s.%s".format(t.name,_)) ) + def getElements(o:Option[JValue]):List[JValue]= o match { + case Some(l:JArray) => l.elements + case _ => Nil + } - def mkPartialJson(baseName:String, ingestInfo:IngestInfo, s: Seq[String], prevMap:Map[String,JValue]=Map())= { + def buildField( nm: (String,String)) =Option(nm._2).map( s=>JField(nm._1,JString(s))) - def getElements(o:Option[JValue]):List[JValue]= o match { - case Some(l:JArray) => l.elements - case _ => Nil - } - def toJObject(o:JValue):JObject= o match { - case j:JObject => j - case _ => sys.error("base value is not jobject!") - } + type StrJVMap= Map[String,JValue] - def buildJValues( ms:(Map[String,JValue],Seq[String]), tblDesc: ImportTable ):(Option[(String,JValue)],Seq[String])={ - val (m,s)=ms - val (tblColValues,rest)=s.splitAt(tblDesc.columns.length) - val objValues =(tblDesc.columns.zip(tblColValues)).flatMap(buildField(_) ).toList - val tblName = tblDesc.name - val keyValue=if (objValues.isEmpty) if (tblDesc.isCollection) Some(tblName->JArray.empty) else None + def objFields( map:StrJVMap, s:Seq[String], tblDesc: ImportTable ):(Option[(String,JValue)],Seq[String])={ + val (tblColValues,rest)=s.splitAt(tblDesc.columns.length) + val objValues =(tblDesc.columns.zip(tblColValues)).flatMap(buildField(_) ) + val tblName = tblDesc.name.toUpperCase + val keyValue= + if (objValues.isEmpty) if (tblDesc.isCollection) Some(tblName->JArray.empty) else None else { - val data=JObject(objValues) - val obj= if (tblDesc.isCollection) JArray(data:: getElements(m.get(tblName)) ) else data + val data=JObject(objValues:_*) + val obj= if (tblDesc.isCollection) JArray(getElements(map.get(tblName)):+data ) else data Some(tblName->obj) } - (keyValue,rest) + (keyValue,rest) + } + def mkJson[M[+_]](baseName:String, ingestInfo:IngestInfo, row: Seq[String], outStream:StreamT[M,JValue], currentObj:StrJVMap=Map())(implicit M:Monad[M]):(StreamT[M,JValue],StrJVMap) ={ + val baseNameUC=baseName.toUpperCase + val singleObjMap=buildJsonObjMap(ingestInfo, Map(),row) + if (currentObj.isEmpty || singleObjMap.get(baseNameUC) == currentObj.get(baseNameUC)){ + val objM=buildJsonObjMap(ingestInfo, currentObj, row) + (outStream, objM) + } else { + val newObj= buildJObject(baseNameUC, currentObj) + (newObj::outStream,singleObjMap) } + } - def buildField( nm: (String,String)) =Option(nm._2).map( s=>JField(nm._1,JString(s))) - - val jsonMap:Map[String,JValue]=ingestInfo.tables.foldLeft( (prevMap,s) )( - (ms,v) =>{ - val (opt,r)= buildJValues(ms,v) - val (m,_)=ms - opt.map( (kobj)=>{ - val (k,obj) =kobj - if (k!=baseName) - (m+(kobj),r) - else if (prevMap.isEmpty || prevMap(k)!= obj) - (Map(kobj),r) - else (m,r) - }).getOrElse((m,r)) - } )._1 - - val base:JObject = toJObject(jsonMap(baseName)) - val values:List[JField] = (jsonMap-baseName).map(nv => JField(nv._1, nv._2)).toList - (JObject(base.fields ++ values),jsonMap) + + private def buildJObject(baseNameUC: String, currentObj: StrJVMap): JObject = { + val base = (currentObj(baseNameUC)) --> classOf[JObject] + val values = (currentObj - baseNameUC) + val newObj = JObject(base.fields ++ values) + newObj } + def buildJsonObjMap(ingestInfo: ImportJdbc.IngestInfo, prevMap: ImportJdbc.StrJVMap, s: Seq[String]): StrJVMap = { + ingestInfo.tables.foldLeft((prevMap, s))({ + case ((m,seq), v) => { + val (opt, r): (Option[(String, JValue)], Seq[String]) = objFields(m, seq, v) //build a json object from the seq values + opt.map(kv => (m + kv, r)).getOrElse((m, r)) + }})._1 + } + + def buildBody(data: StreamT[IO,Seq[String]], baseTable: String, i: IngestInfo)(implicit executor: ExecutionContext, m:FutureMonad, io:Monad[IO]): Future[StreamT[Future,JValue]] ={ + Future(data.foldLeft((StreamT.empty[Future,JValue], Map():StrJVMap))( + { case ((os,currentMap),row)=>mkJson(baseTable,i,row,os,currentMap) } + ).map( { case (strm,obj)=> + buildJObject(baseTable.toUpperCase,obj)::strm + } ).unsafePerformIO()) + } def names(cs:Seq[Column])=cs.map(_.name) @@ -92,33 +114,28 @@ object ImportJdbc { "select %s from %s order by %s".format(colSelect,join,sort) } - def executeQuery(connDb: Connection, query: String ): (Iterator[IndexedSeq[String]],IndexedSeq[Column]) = { + def executeQuery(connDb: Connection, query: String ): (StreamT[IO,IndexedSeq[String]],IndexedSeq[Column]) = { val stmt = connDb.prepareStatement(query) val columns = getColumns(stmt) val rs = stmt.executeQuery() - (rsIterator(rs)(row => for (i <- 1 to columns.size) yield row.getString(i)),columns) + (rsStreamT(rs)(row => for (i <- 1 to columns.size) yield row.getString(i)),columns) } - def getConnection(dbUrl: String, user: String, password: String): Connection = { - DriverManager.getConnection(dbUrl, user, password) + def getConnection(dbUrl: String, user: String, password: String, database:Option[String]): Connection = { + val uri= database.map( dbName=>if (dbUrl.endsWith(dbName)) dbUrl else "%s%s".format(dbUrl,dbName)).getOrElse(dbUrl) + DriverManager.getConnection(uri, user, password) } - def ingest(connDb: Connection, objName:String, query: String, oTblDesc:Option[IngestInfo], ingestPath: =>String, host: =>String, apiKey: =>String) = { + def ingest(connDb: Connection, objName: String, query: String, oTblDesc:Option[IngestInfo], ingestPath: =>String, host: =>String, apiKey: =>String)(implicit executor: ExecutionContext):Future[HttpResponse[ByteChunk]] = { + implicit val M = new FutureMonad(executor) val (data,columns) = executeQuery(connDb, query) val tblDesc= oTblDesc.getOrElse(IngestInfo(Seq(ImportTable(objName,names(columns),Left(Table("base")))))) - val body = buildBody(data, objName, tblDesc) - val fullPath = "%s/ingest/v1/sync/fs%s/%s".format(host, ingestPath,objName) - val httpClient=new HttpClientXLightWeb()(defaultFutureDispatch) - //TODO add owner account id - httpClient.parameters('apiKey -> apiKey).post(fullPath)(jvalueToChunk(body)) + + val path= "%s/%s".format(ingestPath,objName) + val dataStream:Future[StreamT[Future,ByteBuffer]]= buildBody(data, objName, tblDesc).map(toByteStream(_)) + dataStream.flatMap(sendToPrecog(host,path,apiKey,_)) } - def buildBody(data: Iterator[IndexedSeq[String]], baseTable: String, i: IngestInfo): JArray = - JArray(data.foldLeft((List[JValue](), Map[String, JValue]()))((lm, r) => { - val (l, m) = lm - val (values, map) = mkPartialJson(baseTable, i, r, m) - (values :: l, map) - })._1) } diff --git a/tools/import/jdbc/src/main/scala/com/precog/tools/importers/jdbc/ImportJdbcConsole.scala b/tools/import/jdbc/src/main/scala/com/precog/tools/importers/jdbc/ImportJdbcConsole.scala index 9e6a55e..2bbbca3 100644 --- a/tools/import/jdbc/src/main/scala/com/precog/tools/importers/jdbc/ImportJdbcConsole.scala +++ b/tools/import/jdbc/src/main/scala/com/precog/tools/importers/jdbc/ImportJdbcConsole.scala @@ -1,11 +1,20 @@ package com.precog.tools.importers.jdbc -import annotation.tailrec -import java.sql.{Connection, DatabaseMetaData, DriverManager} +import java.sql.{Connection, DatabaseMetaData} import DbAccess._ import DbAnalysis._ import ImportJdbc._ -import Datatypes._ +import blueeyes.bkka.AkkaDefaults._ +import scala.Left +import com.precog.tools.importers.jdbc.Datatypes.Join +import com.precog.tools.importers.jdbc.ImportJdbc.IngestInfo +import scala.Some +import scala.Right +import com.precog.tools.importers.jdbc.ImportJdbc.ImportTable +import com.precog.tools.importers.jdbc.Datatypes.Table +import com.precog.tools.importers.common.ConsoleUtils._ +import org.slf4j.LoggerFactory +import com.precog.tools.importers.common.Ingest._ /** * User: gabriel @@ -13,59 +22,75 @@ import Datatypes._ */ object ImportJdbcConsole { + private lazy val logger = LoggerFactory.getLogger("com.precog.tools.importers.jdbc.ImportJdbcConsole") + + implicit val as=actorSystem + + Option(System.getProperty("jdbc.driver")).map(driver => Class.forName(driver)) + def main(args:Array[String]){ println("Welcome to Precog JDBC import wizard") lazy val dbUrl=readLine("Enter database URL:") lazy val user=readLine("User:") lazy val password = readLine("Password:") // use api key and dispatch to call ingest - lazy val host="http://beta.precog.com" //readLine("ingestion host") //TODO move to trait ? + lazy val host=readLine("Precog ingestion host") lazy val apiKey=readLine("API KEY for ingestion") lazy val basePath=readLine("Base ingestion path ( /{userId}/....)") + importJdbc(dbUrl,user,password, host, apiKey, basePath) + as.shutdown() } - def importJdbc(dbUrl: =>String, user: =>String, password: =>String, host: =>String, apiKey: =>String, basePath: =>String) { + def importJdbc(dbUrl: =>String, user: =>String, password: =>String, host: =>String, apiKey: =>String, basePath: =>String):Unit={ - val conn= getConnection(dbUrl, user, password) - val metadata= conn.getMetaData - val cat= getCatalogs(metadata) + val catConn= getConnection(dbUrl, user, password,None) + val cat= getCatalogs(catConn.getMetaData) //for querying tables, the connection must be specific to a database - val connDb= DriverManager.getConnection("%s%s".format(dbUrl,cat),user,password) - val tqs = getQuery(connDb, metadata, cat) + val connDb= getConnection(dbUrl, user, password,Some(cat)) + val tqs = getQuery(connDb, cat) tqs.map( tqs => { val (table,tDesc,q) = tqs val path= "%s/%s".format(basePath, table) - println(ingest(connDb,table, q, tDesc, path, host, apiKey)) + logger.info("importing %s".format(table)) + ingest(connDb,table, q, tDesc, path, host, apiKey).onComplete { + case Right(result) => callSucceded(result) + case Left(failure) => logger.error("Failed to import %s, error: %s".format(table,failure.getMessage)) + } }) } def getCatalogs(metadata: DatabaseMetaData): String = { println("Catalogs:") - val catalogs = oneColumnRs(metadata.getCatalogs).toArray - println(present(catalogs)) - catalogs({println("Select a catalog: ");readInt()-1}) + val catalogs = oneColumnRs(metadata.getCatalogs) + selectOne("Catalog/Database",catalogs) } def selectColumns(connDb: Connection, table: Table): List[String] = { val labels = names(getColumns(connDb, "select * from %s".format(table.name))) //column selection - println("table: %s".format(table.name)) + println("Table: %s".format(table.name.toUpperCase)) selectSet("column", labels).toList } - def getQuery(connDb: Connection, metadata: DatabaseMetaData, cat: String): Seq[(String,Option[IngestInfo],String)] = { + def getQuery(connDb: Connection, cat: String): Seq[(String,Option[IngestInfo],String)] = { if (readLine("Do you have a SQL query to select the data? (y/N)").toLowerCase == "y") { List((readLine("table/object name: "),None,readLine("Query="))) - } else createQueries(connDb, metadata, cat, selectedTables(findTables(metadata, Some(cat), readTableName())), readLine("Denormalize related tables? (y/n)").toLowerCase == "y") + } else { + val tblName=readTableName() + val metadata= connDb.getMetaData + val tables=findTables(metadata, Some(cat), tblName) + createQueries(connDb, metadata, cat, selectedTables(tables), readLine("Denormalize related tables? (y/n)").toLowerCase == "y") + } } def createQueries(conn:Connection, metadata: DatabaseMetaData, cat: String, selected: Seq[Table],denormalize: => Boolean): Seq[(String,Option[IngestInfo],String)] = { selected.map( table =>{ - val allRelationships = relationships( conn, metadata, Some(cat),table).toSeq + val allRelationships = relationships( conn, metadata, None,table).toSeq + present(allRelationships) val relations= selectSet("relation",allRelationships).toList val tblDesc=buildIngestInfo(table, conn, relations) @@ -94,46 +119,11 @@ object ImportJdbcConsole { } - //case class ImportTable(name:String, columns:Seq[String], baseOrJoin:Either[Table,Join]){ val isCollection = baseOrJoin.right.toOption.map(_.exported).getOrElse(false) } - //case class IngestInfo(tables:Seq[ImportTable]) - - def selectedTables(tablesList: Array[Table]): Seq[Table] = { selectSet("table", tablesList) } - @tailrec - private def selectSet[T](label:String, available: Seq[T], selected: Seq[T]=List())(implicit arg0: ClassManifest[T]): Seq[T] = - if (available.isEmpty) selected - else { - val availArray=available.toArray - - println("Available %ss:".format(label)) - println(present(availArray)) - - println("Selected %ss:".format(label)) - println(present(selected)) - - println("Select a number/enter the name, 0 to select all, or enter to continue: ") - - val selIdx = readLine() - selIdx match { - case "" => selected - case ParseInt(0) => available - case ParseInt(x) if (x<=available.size) => { - val elem:T = availArray(x - 1) - selectSet(label,available.filterNot(_==elem), selected:+elem) - } - case s:String if (available.exists(_.toString == s)) => { - val elem:T =availArray.find(_.toString == s).get - selectSet(label,available.filterNot(_==elem), selected:+elem) - } - case _ => selectSet(label,available, selected) - } - } - - - def present[T](arr:Seq[T])= (1 to arr.length).zip(arr).map(x=>x._1 +":"+ x._2).mkString(", ") + def present[T](arr:Seq[T])= arr.zipWithIndex.map(x=>x._1 +":"+ x._2).mkString(", ") def show(baseTable:Table,set: Set[Join])= set.map( r=> " %s with %s on %s=%s".format(baseTable.name, r.refKey.table, r.baseColName,r.refKey.columnName )).mkString(", ") def readTableName()= { diff --git a/tools/import/jdbc/src/main/scala/com/precog/tools/importers/jdbc/ImportJdbcService.scala b/tools/import/jdbc/src/main/scala/com/precog/tools/importers/jdbc/ImportJdbcService.scala index 605d38f..446974b 100644 --- a/tools/import/jdbc/src/main/scala/com/precog/tools/importers/jdbc/ImportJdbcService.scala +++ b/tools/import/jdbc/src/main/scala/com/precog/tools/importers/jdbc/ImportJdbcService.scala @@ -1,8 +1,8 @@ package com.precog.tools.importers.jdbc -import akka.dispatch.Future -import blueeyes.BlueEyesServiceBuilder +import akka.dispatch.{ExecutionContext, Future} +import blueeyes.{BlueEyesServer, BlueEyesServiceBuilder} import blueeyes.core.http.{HttpRequest, HttpResponse, HttpStatus} import blueeyes.core.http.HttpStatusCodes._ import blueeyes.core.data.DefaultBijections._ @@ -12,8 +12,15 @@ import DbAnalysis._ import ImportJdbc._ import JsonImplicits._ import java.sql.{DatabaseMetaData, Connection} -import blueeyes.json.{JValue, JString, JArray} -import Datatypes._ +import blueeyes.json.{JString, JArray} +import scala.Left +import com.precog.tools.importers.jdbc.Datatypes.Join +import com.precog.tools.importers.jdbc.ImportJdbc.IngestInfo +import scala.Right +import scala.Some +import com.precog.tools.importers.jdbc.ImportJdbc.ImportTable +import com.precog.tools.importers.jdbc.Datatypes.Table +import scalaz.Monad /** @@ -22,8 +29,10 @@ import Datatypes._ */ trait ImportJdbcService extends BlueEyesServiceBuilder { + implicit def executionContext: ExecutionContext + implicit def M: Monad[Future] - val host="http://beta.precog.com" //TODO move to trait + val host=System.getProperty("host") def handleRequest[T](f: HttpRequest[T]=> Future[HttpResponse[T]])= (request: HttpRequest[T]) => @@ -35,25 +44,16 @@ trait ImportJdbcService extends BlueEyesServiceBuilder { def withConnectionFromRequest[T](r:HttpRequest[T])(f: (Connection,HttpRequest[T])=> Future[HttpResponse[T]])= { val dbUrl = r.parameters('dbUrl) - val database= r.parameters.get('database).getOrElse("") + val database= r.parameters.get('database) val user = r.parameters.get('user).getOrElse(null) val pwd = r.parameters.get('password).getOrElse(null) - val uri= if (dbUrl.endsWith(database)) dbUrl else "%s%s".format(dbUrl,database) - val c=getConnection(uri, user, pwd) - try { - f(c,r) - } finally { - c.close() - } + val c=getConnection(dbUrl, user, pwd,database) + f(c,r).flatMap(x=>Future({c.close();x})) } def handleRequestWithConnection[T](f: (Connection,HttpRequest[T])=> Future[HttpResponse[T]])= handleRequest( (r: HttpRequest[T]) => withConnectionFromRequest(r)(f)) def optionYes(ob:Option[String])=ob.map(_.toLowerCase == "y").getOrElse(false) - /*def response[T](f: HttpRequest[T] => HttpResponse[T] )(request: HttpRequest[T]):Future[HttpResponse[T]] = { - - - }*/ def getJoins(infer: Boolean, conn: Connection, metadata: DatabaseMetaData, cat: Option[String], table: Table, idPattern: String, sample: Boolean): Set[Join] = { val inferred = if (infer) getInferredRelationships(conn, metadata, cat, table, idPattern, sample) else Set() @@ -68,7 +68,7 @@ trait ImportJdbcService extends BlueEyesServiceBuilder { - val importService = service("JdbcImportService", "1.0.0") { context => + val importService = service("JdbcImportService", "1.0") { context => startup { Future { () } } -> @@ -77,9 +77,9 @@ trait ImportJdbcService extends BlueEyesServiceBuilder { path("/databases" ) { get { handleRequestWithConnection( (conn:Connection,request:HttpRequest[ByteChunk]) =>{ - val tables=JArray(oneColumnRs(conn.getMetaData.getCatalogs).map(JString(_)).toList) + val tables=JArray(oneColumnRs(conn.getMetaData.getCatalogs).map(JString(_))) Future { - HttpResponse[ByteChunk](content = Option(tables)) + HttpResponse[ByteChunk](content = Option(jvalueToChunk(tables))) } } ) @@ -89,11 +89,11 @@ trait ImportJdbcService extends BlueEyesServiceBuilder { path("/databases" / 'database / "tables" ) { get { handleRequestWithConnection( (conn:Connection,request:HttpRequest[ByteChunk]) => { - val cat = request.parameters.get('database) + val cat = request.parameters.get('database).map(_.toUpperCase ) val ts=findTables(conn.getMetaData,cat,None) val result = JArray(ts.map(t=>JString(t.name)).toList) Future { - HttpResponse[ByteChunk](content = Option(result)) + HttpResponse[ByteChunk](content = Option(jvalueToChunk(result))) } } ) @@ -150,15 +150,17 @@ trait ImportJdbcService extends BlueEyesServiceBuilder { }~ path('database / "table" / 'table / "config") { post { - handleRequestWithConnection( (conn:Connection,request:HttpRequest[ByteChunk]) => { + handleRequest( (request:HttpRequest[ByteChunk]) => { val apiKey= request.parameters('apiKey) val path= request.parameters('path) val table= Table(request.parameters('table)) val cToJ=chunkToFutureJValue request.content.map(cToJ(_)).map(_.flatMap( ingestInfo =>{ + withConnectionFromRequest(request)( (conn:Connection,_)=>{ val query = buildQuery(ingestInfo) ingest(conn,table.name, query, Some(ingestInfo), path, host, apiKey) - })).get + }) + })).getOrElse(Future{ HttpResponse[ByteChunk](content = None) }) }) } } diff --git a/tools/import/jdbc/src/test/scala/com/precog/tools/importers/jdbc/DbAnalysisTest.scala b/tools/import/jdbc/src/test/scala/com/precog/tools/importers/jdbc/DbAnalysisSpec.scala similarity index 84% rename from tools/import/jdbc/src/test/scala/com/precog/tools/importers/jdbc/DbAnalysisTest.scala rename to tools/import/jdbc/src/test/scala/com/precog/tools/importers/jdbc/DbAnalysisSpec.scala index 5789fdb..f5f0aea 100644 --- a/tools/import/jdbc/src/test/scala/com/precog/tools/importers/jdbc/DbAnalysisTest.scala +++ b/tools/import/jdbc/src/test/scala/com/precog/tools/importers/jdbc/DbAnalysisSpec.scala @@ -3,12 +3,25 @@ package com.precog.tools.importers.jdbc import org.specs2.mutable.Specification import DbAnalysis._ import Datatypes._ +import com.precog.tools.importers.jdbc.Datatypes.Table +import com.precog.tools.importers.jdbc.Datatypes.Join /** * User: gabriel * Date: 12/4/12 */ -class DbAnalysisTest extends Specification { +class DbAnalysisSpec extends Specification { + + + "find tables" should { + + + "find all tables" in new Conn { val dbName="tables" + tblA; tblB; tblC; tblD + findTables(conn.getMetaData,None,None) must_== Array(Table("A"),Table("B"),Table("C"),Table("D")) + } + + } "declared relations" should { "identify one to many" in new Conn{ val dbName ="onemany" diff --git a/tools/import/jdbc/src/test/scala/com/precog/tools/importers/jdbc/ImportJdbcServiceTest.scala b/tools/import/jdbc/src/test/scala/com/precog/tools/importers/jdbc/ImportJdbcServiceSpec.scala similarity index 75% rename from tools/import/jdbc/src/test/scala/com/precog/tools/importers/jdbc/ImportJdbcServiceTest.scala rename to tools/import/jdbc/src/test/scala/com/precog/tools/importers/jdbc/ImportJdbcServiceSpec.scala index 07eafb0..3c9ae95 100644 --- a/tools/import/jdbc/src/test/scala/com/precog/tools/importers/jdbc/ImportJdbcServiceTest.scala +++ b/tools/import/jdbc/src/test/scala/com/precog/tools/importers/jdbc/ImportJdbcServiceSpec.scala @@ -6,27 +6,32 @@ import blueeyes.core.http.test.HttpRequestMatchers import blueeyes.core.service._ import blueeyes.core.data.DefaultBijections._ import java.sql.DriverManager -import akka.dispatch.Await +import akka.dispatch.{Future, Await} import blueeyes.core.http.HttpResponse import JsonImplicits._ +import scalaz.Monad +import blueeyes.bkka.{AkkaDefaults, FutureMonad} /** * User: gabriel * Date: 12/4/12 */ -class ImportJdbcServiceTest extends BlueEyesServiceSpecification with ImportJdbcService with HttpRequestMatchers { +class ImportJdbcServiceSpec extends BlueEyesServiceSpecification with ImportJdbcService with HttpRequestMatchers with AkkaDefaults { val executionContext = defaultFutureDispatch + implicit val M: Monad[Future] = new FutureMonad(executionContext) - def dbUrl(db:String)="jdbc:h2:~/%s".format(db) + val servicePrefix="/JdbcImportService/v1" + override val host="https://devapi.precog.com" + def dbUrl(db:String)="jdbc:h2:~/%s".format(db) "Database metadata" should { "get database metadata" in new Conn{ val dbName ="TESTSVC" tblA - val r=client.parameters('dbUrl-> dbUrl(dbName)).get[ByteChunk]("/metadata/databases") + val r=client.parameters('dbUrl-> dbUrl(dbName)).get[ByteChunk](servicePrefix+"/metadata/databases") Await.result(r,1 minute) must beLike { case HttpResponse(_ ,_,Some(Left(buffer)),_) => new String(buffer.array(), "UTF-8") must_== """["TESTSVC"]""" } @@ -36,7 +41,7 @@ class ImportJdbcServiceTest extends BlueEyesServiceSpecification with ImportJdbc "Table metadata" should { "get tables " in new Conn{ val dbName ="tmd" tblA; tblB - val r=client.parameters('dbUrl-> dbUrl(dbName)).get[ByteChunk]("/metadata/databases/%s/tables".format(dbName)) + val r=client.parameters('dbUrl-> dbUrl(dbName)).get[ByteChunk](servicePrefix+"/metadata/databases/%s/tables".format(dbName)) Await.result(r,1 minute) must beLike { case HttpResponse(_ ,_,Some(Left(buffer)),_) => new String(buffer.array(), "UTF-8") must_== """["A","B"]""" } @@ -44,7 +49,7 @@ class ImportJdbcServiceTest extends BlueEyesServiceSpecification with ImportJdbc "get single table desc w/o relations" in new Conn{ val dbName ="t1wor" tblA;tblB - val r= client.parameters('dbUrl-> dbUrl(dbName)).get[ByteChunk]("/metadata/databases/%s/tables/A".format(dbName)) + val r= client.parameters('dbUrl-> dbUrl(dbName)).get[ByteChunk](servicePrefix+"/metadata/databases/%s/tables/A".format(dbName)) Await.result(r,1 minute) must beLike { case HttpResponse(_ ,_,Some(Left(buffer)),_) => new String(buffer.array(), "UTF-8") must_== """[{"name":"A","columns":["ID","NAME"],"base":"A"}]""" } @@ -54,7 +59,7 @@ class ImportJdbcServiceTest extends BlueEyesServiceSpecification with ImportJdbc tblA;tblB cnstrBfkA - val r=client.parameters('dbUrl-> dbUrl(dbName)).get[ByteChunk]("/metadata/databases/%s/tables/A".format(dbName)) + val r=client.parameters('dbUrl-> dbUrl(dbName)).get[ByteChunk](servicePrefix+"/metadata/databases/%s/tables/A".format(dbName)) Await.result(r,1 minute) must beLike { case HttpResponse(_ ,_,Some(Left(buffer)),_) => new String(buffer.array(), "UTF-8") must_== """[{"name":"A","columns":["ID","NAME"],"base":"A"},{"name":"B","columns":["ID","A_ID","NAME"],"join":{"baseColName":"ID","refKey":{"table":"B","columnName":"A_ID"},"exported":true}}]""" @@ -64,7 +69,7 @@ class ImportJdbcServiceTest extends BlueEyesServiceSpecification with ImportJdbc "get table desc with inferred relations w/o sampling" in new Conn{ val dbName ="t1wir" tblA;tblB - val r=client.parameters('dbUrl-> dbUrl(dbName),'infer->"y").get[ByteChunk]("/metadata/databases/%s/tables/A".format(dbName)) + val r=client.parameters('dbUrl-> dbUrl(dbName),'infer->"y").get[ByteChunk](servicePrefix+"/metadata/databases/%s/tables/A".format(dbName)) Await.result(r,1 minute) must beLike { case HttpResponse(_ ,_,Some(Left(buffer)),_) => new String(buffer.array(), "UTF-8") must_== """[{"name":"A","columns":["ID","NAME"],"base":"A"},{"name":"B","columns":["ID","A_ID","NAME"],"join":{"baseColName":"ID","refKey":{"table":"B","columnName":"A_ID"},"exported":true}}]""" @@ -74,7 +79,7 @@ class ImportJdbcServiceTest extends BlueEyesServiceSpecification with ImportJdbc "get table desc with inferred relations with sampling - no data" in new Conn{ val dbName = "t1wirsnd" tblA;tblB - val r=client.parameters('dbUrl-> dbUrl(dbName),'infer->"y", 'sample->"y").get[ByteChunk]("/metadata/databases/%s/tables/A".format(dbName)) + val r=client.parameters('dbUrl-> dbUrl(dbName),'infer->"y", 'sample->"y").get[ByteChunk](servicePrefix+"/metadata/databases/%s/tables/A".format(dbName)) Await.result(r,1 minute) must beLike { case HttpResponse(_ ,_,Some(Left(buffer)),_) => new String(buffer.array(), "UTF-8") must_== """[{"name":"A","columns":["ID","NAME"],"base":"A"}]""" @@ -84,7 +89,7 @@ class ImportJdbcServiceTest extends BlueEyesServiceSpecification with ImportJdbc "get table desc with inferred relations with sampling - with data" in new Conn{ val dbName ="t1wirsd" tblA;tblB; dataA; dataB - val r=client.parameters('dbUrl-> dbUrl(dbName),'infer->"y", 'sample->"y").get[ByteChunk]("/metadata/databases/%s/tables/A".format(dbName)) + val r=client.parameters('dbUrl-> dbUrl(dbName),'infer->"y", 'sample->"y").get[ByteChunk](servicePrefix+"/metadata/databases/%s/tables/A".format(dbName)) Await.result(r,1 minute) must beLike { case HttpResponse(_ ,_,Some(Left(buffer)),_) => new String(buffer.array(), "UTF-8") must_== """[{"name":"A","columns":["ID","NAME"],"base":"A"},{"name":"B","columns":["ID","A_ID","NAME"],"join":{"baseColName":"ID","refKey":{"table":"B","columnName":"A_ID"},"exported":true}}]""" @@ -102,10 +107,10 @@ class ImportJdbcServiceTest extends BlueEyesServiceSpecification with ImportJdbc 'q->"select * from A,B where A.ID = B.A_ID", 'apiKey->apiKey, 'path -> basePath - ).post[ByteChunk]("/ingest/%s/query".format(dbName))(Array.empty[Byte]) + ).post[ByteChunk](servicePrefix+"/ingest/%s/query".format(dbName))(Array.empty[Byte]) Await.result(r,1 minute) must beLike { case HttpResponse(_ ,_,Some(Left(buffer)),_) => new String(buffer.array(), "UTF-8") must_== - """{"failed":0,"skipped":0,"errors":[],"total":1,"ingested":1}""" + """{"ingested":3,"errors":[]}""" } } @@ -115,27 +120,28 @@ class ImportJdbcServiceTest extends BlueEyesServiceSpecification with ImportJdbc 'dbUrl-> dbUrl(dbName), 'denormalize->"y", 'apiKey->apiKey, - 'path -> basePath //path('database / "table" / 'table / "auto") { - ).post[ByteChunk]("/ingest/%s/table/%s/auto".format(dbName,"A"))(Array.empty[Byte]) + 'path -> basePath + ).post[ByteChunk](servicePrefix+"/ingest/%s/table/%s/auto".format(dbName,"A"))(Array.empty[Byte]) Await.result(r,1 minute) must beLike { case HttpResponse(_ ,_,Some(Left(buffer)),_) => new String(buffer.array(), "UTF-8") must_== - """{"failed":0,"skipped":0,"errors":[],"total":1,"ingested":1}""" + """{"ingested":3,"errors":[]}""" } } - //TODO: fix the closed connection issue with the test - /*"ingest with config" in new Conn{ val dbName ="iwcfg" + "ingest with config" in new Conn{ val dbName ="iwcfg" + import DefaultBijections.jvalueToChunk tblA;tblB; dataA; dataB + val r=client.parameters( 'dbUrl-> dbUrl(dbName), 'apiKey->apiKey, - 'path -> basePath //path('database / "table" / 'table / "auto") { - ).post[ByteChunk]("/ingest/%s/table/%s/config".format(dbName,"A"))(JValueToByteArray(tblABDesc)) - Await.result(r,1 minute) must beLike { + 'path -> basePath + ).post[ByteChunk](servicePrefix+"/ingest/%s/table/%s/config".format(dbName,"A"))(ingestInfo2Json(tblABDesc)) + Await.result(r,2 minute) must beLike { case HttpResponse(_ ,_,Some(Left(buffer)),_) => new String(buffer.array(), "UTF-8") must_== - """{"failed":0,"skipped":0,"errors":[],"total":1,"ingested":1}""" + """{"ingested":3,"errors":[]}""" } - }*/ + } } } diff --git a/tools/import/jdbc/src/test/scala/com/precog/tools/importers/jdbc/ImportJdbcSpec.scala b/tools/import/jdbc/src/test/scala/com/precog/tools/importers/jdbc/ImportJdbcSpec.scala new file mode 100644 index 0000000..8c1f6ea --- /dev/null +++ b/tools/import/jdbc/src/test/scala/com/precog/tools/importers/jdbc/ImportJdbcSpec.scala @@ -0,0 +1,165 @@ +package com.precog.tools.importers.jdbc + +import org.specs2.mutable.Specification +import blueeyes.json._ +import Datatypes._ +import akka.dispatch.{Future, Await} +import blueeyes.akka_testing.FutureMatchers +import blueeyes.core.http.test.HttpRequestMatchers +import scala.Left +import com.precog.tools.importers.jdbc.ImportJdbc.IngestInfo +import scala.Right +import scala.Some +import com.precog.tools.importers.jdbc.ImportJdbc.ImportTable +import blueeyes.bkka.AkkaDefaults._ +import blueeyes.core.http.HttpResponse +import scalaz.{Id,StreamT} +import scalaz.effect.IO +import blueeyes.bkka.FutureMonad + +/** + * User: gabriel + * Date: 11/22/12 + */ +class ImportJdbcSpec extends Specification with FutureMatchers with HttpRequestMatchers{ + + + "build queries" should { + "single table query" in { + ImportJdbc.buildQuery(tblADesc) must_== "select a.ID, a.name from A a order by a.ID, a.name" + } + "one to many query" in { + ImportJdbc.buildQuery(tblABDesc) must_== + "select a.ID, a.name, b.ID, b.A_ID, b.name from A a left join B b on a.ID=b.A_ID order by a.ID, a.name, b.ID, b.A_ID, b.name" + } + + "many to many query" in { + ImportJdbc.buildQuery(tblCABDesc) must_== + "select c.A_ID, c.B_ID, c.name, a.ID, a.name, b.ID, b.A_ID, b.name "+ + "from C c left join A a on c.A_ID=a.ID left join B b on c.B_ID=b.ID " + + "order by c.A_ID, c.B_ID, c.name, a.ID, a.name, b.ID, b.A_ID, b.name" + } + + "circular query" in { + ImportJdbc.buildQuery(tblDDesc) must_== + "select dparent.ID, dparent.D_ID, dparent.name, dchild.ID, dchild.name " + + "from D dparent left join D dchild on dparent.ID=dchild.D_ID " + + "order by dparent.ID, dparent.D_ID, dparent.name, dchild.ID, dchild.name" + } + } + + + //implicit def toStreamElem[T](l:List[T])=l.toSeq::StreamT.empty + + "Json build from data" should { + //(baseName:String, ingestInfo:IngestInfo, row: Seq[String], outStream:StreamT[M,JValue], currentObj:StrJVMap=Map()) + val empty=StreamT.empty[Id.Id,JValue] + "build a simple Json" in { + ImportJdbc.mkJson("a",tblADesc,aData,empty) must_== (empty, Map("A" -> JObject(Map("ID" -> JString("1"), "name" -> JString("aaa"))))) + } + + "build a composite Json" in { + ImportJdbc.mkJson("a",tblABDesc,aData++bData,empty) must_== (empty, + Map( + "A" -> JObject(Map("ID" -> JString("1"), "name" -> JString("aaa"))), + "B" -> JArray(List(JObject(Map("ID" -> JString("2"), "A_ID" -> JString("1"), "name" -> JString("bbb"))))) + )) + } + + "build a relation Json" in { + ImportJdbc.mkJson("c",tblCABDesc, cData++aData++bData, empty) must_== ( empty, + Map( + "A" -> JObject(Map("ID" -> JString("1"), "name" -> JString("aaa"))), + "B" -> JObject(Map("ID" -> JString("2"), "A_ID" -> JString("1"), "name" -> JString("bbb"))), + "C" -> JObject(Map("A_ID" -> JString("1"), "B_ID" -> JString("2"), "name" -> JString("ccc"))) + )) + } + + val tblDesc = IngestInfo(List(ImportTable("parent",List("ID","name"), Left(Table("Parent"))),ImportTable("child",List("ID","name","P_ID"), Right(Join("id",Key(Table("child"),"parent_id"),ExportedKey))))) + val dataChld1 = List("1","parent","1","child1","1") + val dataNoChld = List("1","parent",null,null,null) + val dataChld2 = List("1","parent","2","child2","1") + val dataParent3 = List("3","parent3","2","child2","1") + + + + "build Jobjects for multiple values" in { + + val(stream1,map1)=ImportJdbc.mkJson("parent",tblDesc,dataNoChld,StreamT.empty) + map1 must_== + Map("PARENT"->JObject(JField("ID",JString("1"))::JField("name",JString("parent"))::Nil),"CHILD"->JArray(Nil)) + + + val (stream2,map2)=ImportJdbc.mkJson("parent",tblDesc,dataChld2,stream1,map1) + map2 must_== + Map( + "PARENT"-> JObject(JField("ID",JString("1"))::JField("name",JString("parent"))::Nil), + "CHILD"-> JArray( + JObject(JField("ID",JString("2"))::JField("name",JString("child2"))::JField("P_ID",JString("1"))::Nil)::Nil) + ) + + + ImportJdbc.mkJson("parent",tblDesc,dataParent3,stream2,map2)._2 must_== + Map( + "PARENT"->JObject(JField("ID",JString("3"))::JField("name",JString("parent3"))::Nil), + "CHILD"->JArray( + JObject(JField("ID",JString("2"))::JField("name",JString("child2"))::JField("P_ID",JString("1"))::Nil)::Nil) + ) + } + + "buildBody for multiple values" in { + val tblDesc = IngestInfo(List(ImportTable("parent",List("ID","name"), Left(Table("Parent"))),ImportTable("child",List("ID","name","P_ID"), Right(Join("id",Key(Table("child"),"parent_id"),ExportedKey))))) + val dataChld1 = List("1","parent1","1","child1","1") + val dataChld2 = List("1","parent1","2","child2","1") + val dataNoChld = List("2","parent2",null,null,null) + val dataParent3 = List("3","parent3","2","child2","1") + + //val allData= StreamT.fromIterable((dataChld1::dataChld2::dataNoChld::dataParent3::Nil).reverse.map( _.toIndexedSeq).toIterable) + val allData= dataChld1::dataChld2::dataNoChld::dataParent3::StreamT.empty[IO,Seq[String]] + + implicit val executionContext = defaultFutureDispatch + implicit val futureMonad= new FutureMonad(executionContext) + + val r= ImportJdbc.buildBody(allData,"parent",tblDesc) + Await.result(r.flatMap(_.toStream),1 minute) must_==( + JObject(JField("ID",JString("1"))::JField("name",JString("parent1")):: + JField("CHILD",JArray( + JObject(JField("ID",JString("1"))::JField("name",JString("child1"))::JField("P_ID",JString("1"))::Nil):: + JObject(JField("ID",JString("2"))::JField("name",JString("child2"))::JField("P_ID",JString("1"))::Nil):: + Nil) + )::Nil):: + JObject(JField("ID",JString("2"))::JField("name",JString("parent2"))::JField("CHILD",JArray(Nil))::Nil):: + JObject(JField("ID",JString("3"))::JField("name",JString("parent3")):: + JField("CHILD",JArray( + JObject(JField("ID",JString("2"))::JField("name",JString("child2"))::JField("P_ID",JString("1"))::Nil)::Nil) + )::Nil)::Nil).reverse + } + } + + + "Ingest data" should { + + implicit val executionContext = defaultFutureDispatch + + + "ingest a single table" in new Conn{ val dbName ="single_ingest" + tblA + dataA + val r=ImportJdbc.ingest(conn,"a",ImportJdbc.buildQuery(tblADesc),Some(tblADesc),basePath,host,apiKey) + Await.result(r,1 minute) must beLike { + case HttpResponse(_ ,_,Some(Left(buffer)),_) => { new String(buffer.array(), "UTF-8") must_== """{"ingested":3,"errors":[]}"""} + } + } + + "ingest composite tables" in new Conn{ val dbName ="composite_ingest" + tblA; tblB + dataA; dataB + cnstrBfkA + + val r=ImportJdbc.ingest(conn,"a",ImportJdbc.buildQuery(tblABDesc),Some(tblABDesc),basePath,host,apiKey) + Await.result(r,1 minute) must beLike { + case HttpResponse(_ ,_,Some(Left(buffer)),_) => { new String(buffer.array(), "UTF-8") must_== """{"ingested":3,"errors":[]}"""} + } + } + } +} \ No newline at end of file diff --git a/tools/import/jdbc/src/test/scala/com/precog/tools/importers/jdbc/ImportJdbcTest.scala b/tools/import/jdbc/src/test/scala/com/precog/tools/importers/jdbc/ImportJdbcTest.scala deleted file mode 100644 index 053aa31..0000000 --- a/tools/import/jdbc/src/test/scala/com/precog/tools/importers/jdbc/ImportJdbcTest.scala +++ /dev/null @@ -1,125 +0,0 @@ -package com.precog.tools.importers.jdbc - -import org.specs2.mutable.Specification -import blueeyes.json._ -import Datatypes._ -import ImportJdbc.{IngestInfo, ImportTable} -import akka.dispatch.Await -import akka.util.Duration -import blueeyes.core.data.DefaultBijections._ -import blueeyes.akka_testing.FutureMatchers -import blueeyes.core.http.test.HttpRequestMatchers -import blueeyes.core.data._ -import scala.Left -import com.precog.tools.importers.jdbc.ImportJdbc.IngestInfo -import scala.Right -import scala.Some -import com.precog.tools.importers.jdbc.ImportJdbc.ImportTable -import blueeyes.bkka.AkkaDefaults._ -import blueeyes.core.http.{HttpStatus, HttpResponse} -import blueeyes.core.http.HttpStatusCodes.OK - -/** - * User: gabriel - * Date: 11/22/12 - */ -class ImportJdbcTest extends Specification with FutureMatchers with HttpRequestMatchers{ - - "build queries" should { - "single table query" in { - ImportJdbc.buildQuery(tblADesc) must_== "select a.ID, a.name from A a order by a.ID, a.name" - } - "one to many query" in { - ImportJdbc.buildQuery(tblABDesc) must_== - "select a.ID, a.name, b.ID, b.A_ID, b.name from A a left join B b on a.ID=b.A_ID order by a.ID, a.name, b.ID, b.A_ID, b.name" - } - - "many to many query" in { - ImportJdbc.buildQuery(tblCABDesc) must_== - "select c.A_ID, c.B_ID, c.name, a.ID, a.name, b.ID, b.A_ID, b.name "+ - "from C c left join A a on c.A_ID=a.ID left join B b on c.B_ID=b.ID " + - "order by c.A_ID, c.B_ID, c.name, a.ID, a.name, b.ID, b.A_ID, b.name" - } - - "circular query" in { - ImportJdbc.buildQuery(tblDDesc) must_== - "select dparent.ID, dparent.D_ID, dparent.name, dchild.ID, dchild.name " + - "from D dparent left join D dchild on dparent.ID=dchild.D_ID " + - "order by dparent.ID, dparent.D_ID, dparent.name, dchild.ID, dchild.name" - } - } - - "Json build from data" should { - "build a simple Json" in { - ImportJdbc.mkPartialJson("a",tblADesc,aData)._1 must_== jA - } - - "build a composite Json" in { - ImportJdbc.mkPartialJson("a",tblABDesc,aData++bData)._1 must_== jAB - } - - "build a relation Json" in { - ImportJdbc.mkPartialJson("c",tblCABDesc,cData++aData++bData)._1 must_== jC - } - - "build a JArray for multiple values" in { - val tblDesc = IngestInfo(List(ImportTable("parent",List("ID","name"), Left(Table("Parent"))),ImportTable("child",List("ID","name","P_ID"), Right(Join("id",Key(Table("child"),"parent_id"),ExportedKey))))) - val dataChld1 = List("1","parent","1","child1","1") - val dataNoChld = List("1","parent",null,null,null) - val dataChld2 = List("1","parent","2","child2","1") - val dataParent3 = List("3","parent3","2","child2","1") - - val (emptyChildJson,_)=ImportJdbc.mkPartialJson("parent",tblDesc,dataNoChld) - emptyChildJson must_== - JObject(JField("ID",JString("1"))::JField("name",JString("parent"))::JField("child",JArray(Nil))::Nil) - - - val (partJson,m)=ImportJdbc.mkPartialJson("parent",tblDesc,dataChld2) - partJson must_== - JObject(JField("ID",JString("1"))::JField("name",JString("parent")):: - JField("child",JArray( - JObject(JField("ID",JString("2"))::JField("name",JString("child2"))::JField("P_ID",JString("1"))::Nil)::Nil) - )::Nil) - - val (d1Json,m1)=ImportJdbc.mkPartialJson("parent",tblDesc,dataChld1,m) - d1Json must_== - JObject(JField("ID",JString("1"))::JField("name",JString("parent")):: - JField("child",JArray( - JObject(JField("ID",JString("1"))::JField("name",JString("child1"))::JField("P_ID",JString("1"))::Nil):: - JObject(JField("ID",JString("2"))::JField("name",JString("child2"))::JField("P_ID",JString("1"))::Nil)::Nil) - )::Nil) - - ImportJdbc.mkPartialJson("parent",tblDesc,dataParent3,m1)._1 must_== JObject(JField("ID",JString("3"))::JField("name",JString("parent3")):: - JField("child",JArray( - JObject(JField("ID",JString("2"))::JField("name",JString("child2"))::JField("P_ID",JString("1"))::Nil)::Nil) - )::Nil) - } - } - - "Ingest data" should { - - implicit val executionContext = defaultFutureDispatch - - - "ingest a single table" in new Conn{ val dbName ="single_ingest" - tblA - dataA - val r=ImportJdbc.ingest(conn,"a",ImportJdbc.buildQuery(tblADesc),Some(tblADesc),basePath,host,apiKey) - Await.result(r,1 minute) must beLike { - case HttpResponse(_ ,_,Some(Left(buffer)),_) => { new String(buffer.array(), "UTF-8") must_== """{"failed":0,"skipped":0,"errors":[],"total":1,"ingested":1}"""} - } - } - - "ingest composite tables" in new Conn{ val dbName ="composite_ingest" - tblA; tblB - dataA; dataB - cnstrBfkA - - val r=ImportJdbc.ingest(conn,"a",ImportJdbc.buildQuery(tblABDesc),Some(tblABDesc),basePath,host,apiKey) - Await.result(r,1 minute) must beLike { - case HttpResponse(_ ,_,Some(Left(buffer)),_) => { new String(buffer.array(), "UTF-8") must_== """{"failed":0,"skipped":0,"errors":[],"total":1,"ingested":1}"""} - } - } - } -} - diff --git a/tools/import/jdbc/src/test/scala/com/precog/tools/importers/jdbc/package.scala b/tools/import/jdbc/src/test/scala/com/precog/tools/importers/jdbc/package.scala index 0499cf3..e738af4 100644 --- a/tools/import/jdbc/src/test/scala/com/precog/tools/importers/jdbc/package.scala +++ b/tools/import/jdbc/src/test/scala/com/precog/tools/importers/jdbc/package.scala @@ -18,9 +18,9 @@ package object jdbc { Class.forName("org.h2.Driver") // use api key and dispatch to call ingest - val host="http://beta.precog.com" - val apiKey="43AB865E-BB86-4F74-A57E-7E8BBD77F2B5" - val basePath="/0000000457/data" + val host="https://devapi.precog.com" + val apiKey="0A24F09F-19CB-45D0-8BFA-543C61BA5EE6" + val basePath="/0000000075/data" def tblA(implicit conn:Connection) = conn.createStatement().execute(" create table A( id int primary key, name varchar(10) ) ") def tblB(implicit conn:Connection) = conn.createStatement().execute(" create table B( id int primary key, a_id int, name varchar(10)) ") @@ -77,8 +77,8 @@ package object jdbc { val jA = JObject(JField("ID",JString("1"))::JField("name",JString("aaa"))::Nil) val jB =JObject(JField("ID",JString("2"))::JField("A_ID",JString("1"))::JField("name",JString("bbb"))::Nil) - val jAB = JObject(JField("ID",JString("1"))::JField("name",JString("aaa"))::JField("b",JArray(jB::Nil))::Nil) - val jC = JObject(JField("A_ID",JString("1"))::JField("B_ID",JString("2"))::JField("name",JString("ccc"))::JField("a",jA)::JField("b",JObject(JField("ID",JString("2"))::JField("A_ID",JString("1"))::JField("name",JString("bbb"))::Nil))::Nil) + val jAB = JObject(JField("ID",JString("1"))::JField("name",JString("aaa"))::JField("B",JArray(jB::Nil))::Nil) + val jC = JObject(JField("A_ID",JString("1"))::JField("B_ID",JString("2"))::JField("name",JString("ccc"))::JField("A",jA)::JField("B",JObject(JField("ID",JString("2"))::JField("A_ID",JString("1"))::JField("name",JString("bbb"))::Nil))::Nil) //def getConn(db:String)=DriverManager.getConnection("jdbc:h2:~/%s".format(db)) @@ -94,6 +94,4 @@ package object jdbc { } } - def manageConn(s:String)= new Conn{ val dbName=s } - } diff --git a/tools/import/mongodb/README.md b/tools/import/mongodb/README.md new file mode 100644 index 0000000..e69de29 diff --git a/tools/import/mongodb/build.sbt b/tools/import/mongodb/build.sbt new file mode 100644 index 0000000..f0fe6cb --- /dev/null +++ b/tools/import/mongodb/build.sbt @@ -0,0 +1,30 @@ +import AssemblyKeys._ + +name := "import-mongodb" + +organization := "org.precog" + +version := "0.2" + +scalaVersion := "2.9.2" + +resolvers ++= Seq( + "ReportGrid (public)" at "http://nexus.reportgrid.com/content/repositories/public-releases", + "Sonatype" at "http://oss.sonatype.org/content/repositories/public", + "Typesafe" at "http://repo.typesafe.com/typesafe/releases/", + "Typesafe-snapshots" at "http://repo.typesafe.com/typesafe/snapshots/", + "Scala Tools" at "http://scala-tools.org/repo-snapshots/" +) + +scalacOptions ++= Seq("-unchecked", "-deprecation") + +assemblySettings + +libraryDependencies ++= Seq( + "com.reportgrid" %% "blueeyes-core" % "1.0.0-M8.1", + "com.reportgrid" %% "blueeyes-json" % "1.0.0-M8.1", + "com.reportgrid" %% "blueeyes-mongo" % "1.0.0-M8.1", + "org.mongodb" %% "casbah" % "2.3.0", + "org.specs2" %% "specs2" % "1.12.2" % "test", + "ch.qos.logback" % "logback-classic" % "1.0.0" +) diff --git a/tools/import/mongodb/project/Build.scala b/tools/import/mongodb/project/Build.scala new file mode 100644 index 0000000..92ee8dd --- /dev/null +++ b/tools/import/mongodb/project/Build.scala @@ -0,0 +1,9 @@ +import sbt._ +object MongoImportProj extends Build +{ + lazy val root = + Project("import-mongo", file(".")) dependsOn(common) + lazy val common = + ProjectRef(uri("../common/"), "import-common") +} + diff --git a/tools/import/mongodb/project/plugins.sbt b/tools/import/mongodb/project/plugins.sbt new file mode 100644 index 0000000..d79423c --- /dev/null +++ b/tools/import/mongodb/project/plugins.sbt @@ -0,0 +1,2 @@ +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.7") + diff --git a/tools/import/mongodb/src/main/resources/application.conf b/tools/import/mongodb/src/main/resources/application.conf new file mode 100644 index 0000000..85664e8 --- /dev/null +++ b/tools/import/mongodb/src/main/resources/application.conf @@ -0,0 +1,8 @@ +blueeyes-async { + name = "DefaultActorPool" + keep-alive-time = 5s + core-pool-size-factor = 1.0 + core-pool-size-max = 8 + max-pool-size-factor = 1.0 + max-pool-size-max = 8 +} \ No newline at end of file diff --git a/tools/import/mongodb/src/main/scala/com/precog/tools/importers/mongo/ImportMongo.scala b/tools/import/mongodb/src/main/scala/com/precog/tools/importers/mongo/ImportMongo.scala new file mode 100644 index 0000000..5ed50c2 --- /dev/null +++ b/tools/import/mongodb/src/main/scala/com/precog/tools/importers/mongo/ImportMongo.scala @@ -0,0 +1,244 @@ +package com.precog.tools.importers.mongo + +import com.mongodb.casbah.Imports._ +import blueeyes.persistence.mongo.json.BijectionsMongoJson._ +import com.mongodb.casbah.commons.TypeImports.ObjectId +import scalaz._ +import akka.dispatch.Future +import blueeyes.json._ +import blueeyes.bkka.AkkaDefaults._ +import blueeyes.bkka.AkkaDefaults.defaultFutureDispatch +import blueeyes.bkka.FutureMonad +import blueeyes.core.http.HttpResponse +import com.precog.tools.importers.common._ +import ConsoleUtils._ +import com.mongodb +import mongodb.casbah.MongoURI +import java.util.Date +import java.util +import collection.JavaConversions._ + +import scala.Left +import scala.Some +import com.precog.tools.importers.common.Ingest._ +import org.slf4j.LoggerFactory + + +/** + * User: gabriel + * Date: 1/17/13 + */ +object ImportMongo { +/* +Next version: +every time it loads: +configuration exists? +if no, prompt to run config +if yes, check for new collections and prompt to config those + +after run, update config with latest + +use configrity for configuration +*/ + private lazy val logger = LoggerFactory.getLogger("com.precog.tools.importers.mongo.ImportMongo") + + implicit val as=actorSystem + implicit val executionContext = defaultFutureDispatch + implicit val M: Monad[Future] = new FutureMonad(executionContext) + + val collsConfig="precog_import_config" + val sampleSize=100 + + + def matchValid[T](value:Any, fi: =>T, fl: =>T, ff: =>T, fd: =>T, fs: =>T,foid: =>T, fdt: =>T, fno: =>T= null)={ + value match { + case i: java.lang.Integer => fi + case l: java.lang.Long => fl + case f: java.lang.Float => ff + case d: java.lang.Double => fd + case l: java.lang.String => fs + case oid: ObjectId => foid + case dt: Date => fdt + case _ if fno != null => fno + } + } + + // No @tailrec but we don't expect getting back from mongoDb a hierarchy big enough to blow the stack + def columnsOf(bObject: MongoDBObject): Seq[String]={ + bObject.flatMap(kv => kv._2 match { + case m:MongoDBObject => columnsOf(m).map("%s.%s".format(kv._1,_)) + case _ => Set(kv._1) + }).toSeq + } + + def sampleColumns(mongoConn:MongoConnection)(db: MongoDB, coll: String):Set[String]={ + val collection=db(coll).find().take(sampleSize) + collection.foldLeft(Set[String]())((s,o)=>s++(columnsOf(o))) + } + + def configureCollections(mongoConn:MongoConnection)(db: MongoDB):Seq[DBObject]={ + println("No configuration found in the mongo instance, creating a new one.") + println("DATABASE %s \n".format(db.name)) + val userCollections=db.getCollectionNames().filter(name=> !(name.startsWith("system.") || name.startsWith(collsConfig))) + val colls=selectSet("collection",userCollections.toSeq) + colls.map( coll =>{ + println("\n ---- Collection %s ----".format(coll)) + val columns=sampleColumns(mongoConn)(db,coll).toSeq + val fields=selectSet("column", columns) + + val sortColumns=db(coll).find().take(sampleSize).map(mobj => mobj.toMap).reduceLeft(_++_).filter( kv => matchValid(kv._2, true, true, true, true, true, true, true, false )) + + val sortColumn=selectOne("import control column", sortColumns.keys.toSeq) + MongoDBObject("collection"->coll, "fields"->fields, "sortColumn"->sortColumn) + } + ) + } + + def pair[T](getter: String=>T)(name:String ) = (name-> getter(name)) + + def getString(jo: JObject)(field:String) = strValue(jo \ field) + def getArray(jo: JObject)(field:String) = arrOfStrValues(jo \ field) + + def strValue(jv: JValue) = (jv --> classOf[JString]).value + def arrOfStrValues(jv: JValue) = (jv -->? classOf[JArray]).map(_.elements.map(strValue(_))).getOrElse(Nil) + + + def importCollection(host:String, basePath:String, apiKey:String, db:MongoDB, mdbobj: MongoDBObject, mongoConn: MongoConnection):Future[(Either[String,String],AnyRef)]={ + + val collName = mdbobj.getAs[String]("collection").get + val fieldNames = mdbobj.getAsOrElse[util.ArrayList[String]]("fields",new util.ArrayList()) + val lastId = mdbobj.getAs[String]("lastId") + val sortColumn=mdbobj.getAs[String]("sortColumn").get + + logger.info("Ingesting %s since %s of %s".format(collName,lastId,sortColumn)) + + val fdsid = Future { dsZipMaxIds(db, collName, sortColumn, fieldNames, lastId) } + val (fds, fmaxId) = (fdsid map (_._1), fdsid map (_._2)) + + val fjsons = fds.map(_.flatMap(MongoToJson(_).toStream)) + val path = "%s/%s/%s".format(basePath, db.name, collName) + val data = StreamT.fromStream[Future, JObject](fjsons) + val fsend= data.isEmpty.flatMap( isEmpty => + if (isEmpty) Future(Left("No new data found in %s.%s".format(db.name,collName))) + else sendToPrecog(host,path,apiKey,toByteStream(data),streaming=false) flatMap( _ match { + case HttpResponse(status, _, Some(Left(buffer)), _) => { + Future(Right("Result from precog: %s (%s)".format(new String(buffer.array(), "UTF-8"), status))) + } + case HttpResponse(_, _, Some(Right(stream)), _) => { + stream.toStream.map( strmBuffer =>Right("Streaming result from precog: %s".format(strmBuffer.foldLeft("")( (str,b)=> str+new String(b.array(), "UTF-8"))))) + } + case result => Future(Left("Error: %s".format(result.toString()))) + } + )) + M.lift2((a: Either[String,String], b: AnyRef) => (a, b))(fsend, fmaxId) + } + + + def dsZipMaxIds(db: MongoDB, collName: String, sortColumn: String, fieldNames: util.ArrayList[String], lastId: Option[String]): (Stream[DBObject], AnyRef) = { + val rStrm = readFromMongo(db, collName, sortColumn, fieldNames, lastId) + val (oids, dataStrm) = rStrm.map(m => (m(sortColumn), m)).unzip + + + val maxOid = if (oids.isEmpty) lastId + else { + + //ugly but need the runtime type to go form AnyRef to Ordering[_] for max to work... sum types + def ordering for sum types? + def refine[T<:Comparable[T]](f: AnyRef=>T)=oids.map(f).max + matchValid(oids.head, + refine({case ss:java.lang.Integer => ss}), + refine({case ss:java.lang.Long => ss}), + refine({case ss:java.lang.Float => ss}), + refine({case ss:java.lang.Double => ss}), + refine({case ss:String => ss}), + refine({case ss:ObjectId => ss}), + refine({case ss:Date => ss})) + } + (dataStrm, maxOid) + } + + def readFromMongo[A : AsQueryParam](mongoDB: MongoDB, collName: String, idCol:String, fieldNames:Seq[String], oLastId:Option[A]=None:Option[ObjectId]):Stream[DBObject]={ + val mongoColl = mongoDB(collName) + val q = oLastId.map( idCol $gt _ ).getOrElse(MongoDBObject()) + val fields = MongoDBObject(fieldNames.map(_->""):_*) + mongoColl.find(q,fields).toStream + } + + def main(args:Array[String]){ + + if (args.length != 4) { + println("Wrong number of parameters.") + println("Usage: ImportMongo mongo_uri precog_host_url precog_ingest_path precog_apiKey") + actorSystem.shutdown() + sys.exit(1) + } + + val mongoUri=args(0) + + val precogHost=args(1) + val basePath=args(2) + val apiKey=args(3) + try { + val uri = MongoURI(mongoUri) + + val mongoConn=MongoConnection(uri) + uri.database.map { database => + + val db = mongoConn(database) + for { + user <- uri.username + password <- uri.password + } { + db.authenticate(user, password.mkString) + } + + val inputConfigColl=db(collsConfig) + + if (inputConfigColl.isEmpty) { + val configs=configureCollections(mongoConn)(db) + configs.map(inputConfigColl.save(_)) + } + + val jsonInputs= inputConfigColl.find().toList + + val fimports=jsonInputs.map(config=> { + + val collName = config.getAs[String]("collection").get + val lastId = config.getAs[String]("lastId") + val sortColumn=config.getAs[String]("sortColumn").get + println("Ingesting %s since %s of %s".format(collName,lastId,sortColumn)) + + importCollection(precogHost,basePath,apiKey,db, config, mongoConn) + }) + + Future.sequence(fimports).onComplete( x => {x match { + case Right(results) => { + jsonInputs.zip(results).map( {case (mDbObj,(result,lastId)) => + result.left.map(s=>{ + val result="%s".format(s) + logger.warn(result) + println(result) + } + ).right.map(s=>{ + inputConfigColl.save(mDbObj++("lastId"->lastId)) + val result="%s".format(s) + logger.info(result) + println(result) + } + ) + } + ) + } + case Left(e) => logger.error("Exception during import ",e) + } + actorSystem.shutdown() + } + ) + } + } catch { + case e:Throwable => { + logger.error("General exception during import",e) + actorSystem.shutdown() + } + } + } +} diff --git a/tools/import/mongodb/src/test/scala/com/precog/tools/importers/mongo/ImportMongoSpec.scala b/tools/import/mongodb/src/test/scala/com/precog/tools/importers/mongo/ImportMongoSpec.scala new file mode 100644 index 0000000..aa0b82e --- /dev/null +++ b/tools/import/mongodb/src/test/scala/com/precog/tools/importers/mongo/ImportMongoSpec.scala @@ -0,0 +1,117 @@ +package com.precog.tools.importers.mongo + +import com.mongodb.casbah.Imports._ +import blueeyes.persistence.mongo.RealMongoSpecSupport +import com.mongodb.casbah.MongoDB +import org.specs2.mutable.After +import org.specs2.specification.Scope + + +/** + * User: gabriel + * Date: 3/29/13 + */ +class ImportMongoSpec extends RealMongoSpecSupport { + + trait Mongo extends After with Scope { + + def dbName:String + + implicit lazy val testDb= MongoDB(realMongo, dbName ) + + def after{ + testDb.dropDatabase() + } + } + + + //def readFromMongo(mongoDB: MongoDB, collName: String, idCol:String, oLastId:Option[AnyRef], fieldNames:Seq[String]):Stream[DBObject]={ + "read from mongo" should { + "return only selected columns" in new Mongo { + val dbName="t1" + val newObj = MongoDBObject("a" -> "1", "x" -> "y", "b" -> 3, "spam" -> "eggs") + testDb("test1").save(newObj) + val r=ImportMongo.readFromMongo(testDb,"test1","_id", Seq("a","b")) + r.head must_== MongoDBObject("_id"->newObj("_id"),"a" -> "1", "b" -> 3) + } + + "return the whole connection if no last id" in new Mongo { + val dbName="t2" + val data = List(MongoDBObject("a" -> 1),MongoDBObject("a" -> 2),MongoDBObject("a" -> 3),MongoDBObject("a" -> 4)) + data.foreach( testDb("test2").save(_) ) + val r=ImportMongo.readFromMongo(testDb,"test2","a",Seq("a")) + r must containTheSameElementsAs(data) + } + + "return only new rows" in new Mongo{ + val dbName="t3" + val (d1,d2,d3,d4)=(MongoDBObject("a" -> 1),MongoDBObject("a" -> 2),MongoDBObject("a" -> 3),MongoDBObject("a" -> 4)) + val data = List(d1,d2,d3,d4) + data.foreach( testDb("test3").save(_) ) + ImportMongo.readFromMongo(testDb,"test3","a", Seq("a")) must containTheSameElementsAs(data) + val r=ImportMongo.readFromMongo(testDb,"test3","a", Seq("a"),Some(2)) + r must containTheSameElementsAs(List(d3,d4)) + } + + "return empty if no new rows" in new Mongo{ + val dbName="t4" + val (d1,d2,d3,d4)=(MongoDBObject("a" -> 1),MongoDBObject("a" -> 2),MongoDBObject("a" -> 3),MongoDBObject("a" -> 4)) + val data = List(d1,d2,d3,d4) + data.foreach( testDb("test4").save(_) ) + val r=ImportMongo.readFromMongo(testDb,"test4","a", Seq("a"),Some(4)) + r must be empty + } + } + + "columns of" should { + "return no columns for the empty object" in { + ImportMongo.columnsOf(MongoDBObject()) must be empty + } + + "return the set of columns of an object" in { + ImportMongo.columnsOf(MongoDBObject("a"->1,"c"->3, "column"->"zzzz")) must containTheSameElementsAs(Seq("c","column","a")) + } + } + + + "sample columns" should { + "return no columns when the collection is empty" in new Mongo{ + val dbName="t5" + val data = List() + data.foreach( testDb("test5").save(_) ) + val conn=new MongoConnection(realMongo) + val cols=ImportMongo.sampleColumns(conn)(testDb,"test5") + cols must be empty + } + "identify all the columns with a collection smaller than the sample size" in new Mongo{ + val dbName="t6" + val data = List(MongoDBObject("a" -> 1,"b"->"a"),MongoDBObject("a" -> 2,"b"->"b"),MongoDBObject("a" -> 3,"b"->"c"),MongoDBObject("a" -> 4,"b"->"d")) + data.foreach( testDb("test6").save(_) ) + + val conn=new MongoConnection(realMongo) + val cols=ImportMongo.sampleColumns(conn)(testDb,"test6") + cols must_== Set("_id","a","b") + } + + "identify all the columns with a collection bigger than the sample size" in new Mongo{ + val dbName="t7" + (1 to 2*ImportMongo.sampleSize).foreach (i=>{testDb("test7").save(MongoDBObject("data" -> "asdb","idx"->i))}) + + + val conn=new MongoConnection(realMongo) + val cols=ImportMongo.sampleColumns(conn)(testDb,"test7") + cols must_== Set("_id","data","idx") + } + + "identify all the columns for collections with different objects" in new Mongo{ + val dbName="t8" + val data = List(MongoDBObject("a" -> 1,"b"->"a"),MongoDBObject("c" -> 2,"d"->"b"),MongoDBObject("a" -> 3,"b"->"c","z"->123),MongoDBObject("a" -> 4,"b"->"d")) + data.foreach( testDb("test8").save(_) ) + + val conn=new MongoConnection(realMongo) + val cols=ImportMongo.sampleColumns(conn)(testDb,"test8") + cols must_== Set("_id","a","b","c","d","z") + } + } + +} diff --git a/tools/import/project/Build.scala b/tools/import/project/Build.scala new file mode 100644 index 0000000..9fa3a6c --- /dev/null +++ b/tools/import/project/Build.scala @@ -0,0 +1,12 @@ +import sbt._ + +object ImportToolsBuild extends Build { + + lazy val root = Project(id = "import-tools", base = file(".")) aggregate(common,jdbc, mongo) + + lazy val common = Project(id = "import-common", base = file("common")) + + lazy val mongo = Project(id = "import-mongodb", base = file("mongodb")) dependsOn("import-common") + + lazy val jdbc = Project(id = "import-jdbc", base = file("jdbc")) dependsOn("import-common") +} diff --git a/tools/import/project/plugins.sbt b/tools/import/project/plugins.sbt new file mode 100644 index 0000000..3ad699a --- /dev/null +++ b/tools/import/project/plugins.sbt @@ -0,0 +1,3 @@ +addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.2.0") + +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.7")